blob: af3ffe4882b0d7b91b46580d7e60ba038a8706f1 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070020from Queue import Queue, Empty
alshabib06b449c2017-01-15 17:33:16 -060021import os
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070022
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070023from grpc import StatusCode
24from grpc._channel import _Rendezvous
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070025from structlog import get_logger
26from twisted.internet import reactor
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070027from twisted.internet import threads
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070028from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070029
Zsolt Haraszti66862032016-11-28 14:28:39 -080030from protos.voltha_pb2 import ID, VolthaLocalServiceStub, FlowTableUpdate, \
Jonathan Hart398e4072018-05-30 16:54:00 -070031 FlowGroupTableUpdate, PacketOut
32from protos.logical_device_pb2 import LogicalPortId
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080033from google.protobuf import empty_pb2
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070034
35
36log = get_logger()
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070037
38
39class GrpcClient(object):
40
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070041 def __init__(self, connection_manager, channel):
42
43 self.connection_manager = connection_manager
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070044 self.channel = channel
Zsolt Haraszti66862032016-11-28 14:28:39 -080045 self.local_stub = VolthaLocalServiceStub(channel)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070046
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070047 self.stopped = False
48
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070049 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
50 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080051 self.change_event_queue = DeferredQueue() # queue change events
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070052
53 def start(self):
54 log.debug('starting')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070055 self.start_packet_out_stream()
56 self.start_packet_in_stream()
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080057 self.start_change_event_in_stream()
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070058 reactor.callLater(0, self.packet_in_forwarder_loop)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080059 reactor.callLater(0, self.change_event_processing_loop)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070060 log.info('started')
61 return self
62
63 def stop(self):
64 log.debug('stopping')
65 self.stopped = True
66 log.info('stopped')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070067
68 def start_packet_out_stream(self):
69
70 def packet_generator():
71 while 1:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070072 try:
73 packet = self.packet_out_queue.get(block=True, timeout=1.0)
74 except Empty:
75 if self.stopped:
76 return
77 else:
78 yield packet
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070079
80 def stream_packets_out():
81 generator = packet_generator()
alshabib06b449c2017-01-15 17:33:16 -060082 try:
83 self.local_stub.StreamPacketsOut(generator)
84 except _Rendezvous, e:
85 if e.code() == StatusCode.UNAVAILABLE:
86 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070087
88 reactor.callInThread(stream_packets_out)
89
90 def start_packet_in_stream(self):
91
92 def receive_packet_in_stream():
Zsolt Haraszti66862032016-11-28 14:28:39 -080093 streaming_rpc_method = self.local_stub.ReceivePacketsIn
Zsolt Haraszti9ed54292017-01-09 18:28:32 -080094 iterator = streaming_rpc_method(empty_pb2.Empty())
alshabib06b449c2017-01-15 17:33:16 -060095 try:
96 for packet_in in iterator:
97 reactor.callFromThread(self.packet_in_queue.put,
98 packet_in)
99 log.debug('enqued-packet-in',
100 packet_in=packet_in,
101 queue_len=len(self.packet_in_queue.pending))
102 except _Rendezvous, e:
103 if e.code() == StatusCode.UNAVAILABLE:
104 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700105
106 reactor.callInThread(receive_packet_in_stream)
107
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800108 def start_change_event_in_stream(self):
109
110 def receive_change_events():
111 streaming_rpc_method = self.local_stub.ReceiveChangeEvents
Zsolt Haraszti9ed54292017-01-09 18:28:32 -0800112 iterator = streaming_rpc_method(empty_pb2.Empty())
alshabib06b449c2017-01-15 17:33:16 -0600113 try:
114 for event in iterator:
115 reactor.callFromThread(self.change_event_queue.put, event)
116 log.debug('enqued-change-event',
117 change_event=event,
118 queue_len=len(self.change_event_queue.pending))
119 except _Rendezvous, e:
120 if e.code() == StatusCode.UNAVAILABLE:
121 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800122
123 reactor.callInThread(receive_change_events)
124
125 @inlineCallbacks
126 def change_event_processing_loop(self):
127 while True:
128 try:
129 event = yield self.change_event_queue.get()
130 device_id = event.id
131 self.connection_manager.forward_change_event(device_id, event)
132 except Exception, e:
133 log.exception('failed-in-packet-in-handler', e=e)
134 if self.stopped:
135 break
136
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700137 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700138 def packet_in_forwarder_loop(self):
139 while True:
140 packet_in = yield self.packet_in_queue.get()
141 device_id = packet_in.id
142 ofp_packet_in = packet_in.packet_in
143 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700144 if self.stopped:
145 break
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700146
147 def send_packet_out(self, device_id, packet_out):
148 packet_out = PacketOut(id=device_id, packet_out=packet_out)
149 self.packet_out_queue.put(packet_out)
150
151 @inlineCallbacks
Jonathan Hart8d21c322018-04-17 07:42:02 -0700152 def get_port(self, device_id, port_id):
153 req = LogicalPortId(id=device_id, port_id=port_id)
154 res = yield threads.deferToThread(
155 self.local_stub.GetLogicalDevicePort, req)
156 returnValue(res)
157
158 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700159 def get_port_list(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700160 req = ID(id=device_id)
161 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800162 self.local_stub.ListLogicalDevicePorts, req)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700163 returnValue(res.items)
164
165 @inlineCallbacks
Jonathan Hart8d21c322018-04-17 07:42:02 -0700166 def enable_port(self, device_id, port_id):
167 req = LogicalPortId(
168 id=device_id,
169 port_id=port_id
170 )
171 res = yield threads.deferToThread(
172 self.local_stub.EnableLogicalDevicePort, req)
173 returnValue(res)
174
175 @inlineCallbacks
176 def disable_port(self, device_id, port_id):
177 req = LogicalPortId(
178 id=device_id,
179 port_id=port_id
180 )
181 res = yield threads.deferToThread(
182 self.local_stub.DisableLogicalDevicePort, req)
183 returnValue(res)
184
185 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700186 def get_device_info(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700187 req = ID(id=device_id)
188 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800189 self.local_stub.GetLogicalDevice, req)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700190 returnValue(res)
191
192 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700193 def update_flow_table(self, device_id, flow_mod):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700194 req = FlowTableUpdate(
195 id=device_id,
196 flow_mod=flow_mod
197 )
198 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800199 self.local_stub.UpdateLogicalDeviceFlowTable, req)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700200 returnValue(res)
201
202 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700203 def update_group_table(self, device_id, group_mod):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800204 req = FlowGroupTableUpdate(
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700205 id=device_id,
206 group_mod=group_mod
207 )
208 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800209 self.local_stub.UpdateLogicalDeviceFlowGroupTable, req)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700210 returnValue(res)
211
212 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700213 def list_flows(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700214 req = ID(id=device_id)
215 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800216 self.local_stub.ListLogicalDeviceFlows, req)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700217 returnValue(res.items)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700218
219 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700220 def list_groups(self, device_id):
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700221 req = ID(id=device_id)
222 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800223 self.local_stub.ListLogicalDeviceFlowGroups, req)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700224 returnValue(res.items)
Nicolas Palpacuerfd7b8b12018-06-15 13:58:06 -0400225
226 @inlineCallbacks
227 def list_ports(self, device_id):
228 req = ID(id=device_id)
229 res = yield threads.deferToThread(
230 self.local_stub.ListLogicalDevicePorts, req)
231 returnValue(res.items)