blob: ccdb98bc2c504a68a5398d51ee15806627d366a9 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070020from Queue import Queue, Empty
alshabib06b449c2017-01-15 17:33:16 -060021import os
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070022
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070023from grpc import StatusCode
24from grpc._channel import _Rendezvous
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070025from structlog import get_logger
26from twisted.internet import reactor
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070027from twisted.internet import threads
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070028from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070029
Zsolt Haraszti66862032016-11-28 14:28:39 -080030from protos.voltha_pb2 import ID, VolthaLocalServiceStub, FlowTableUpdate, \
31 FlowGroupTableUpdate, PacketOut
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080032from google.protobuf import empty_pb2
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070033
34
35log = get_logger()
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070036
37
38class GrpcClient(object):
39
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070040 def __init__(self, connection_manager, channel):
41
42 self.connection_manager = connection_manager
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070043 self.channel = channel
Zsolt Haraszti66862032016-11-28 14:28:39 -080044 self.local_stub = VolthaLocalServiceStub(channel)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070045
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070046 self.stopped = False
47
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070048 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
49 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080050 self.change_event_queue = DeferredQueue() # queue change events
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070051
52 def start(self):
53 log.debug('starting')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070054 self.start_packet_out_stream()
55 self.start_packet_in_stream()
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080056 self.start_change_event_in_stream()
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070057 reactor.callLater(0, self.packet_in_forwarder_loop)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080058 reactor.callLater(0, self.change_event_processing_loop)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070059 log.info('started')
60 return self
61
62 def stop(self):
63 log.debug('stopping')
64 self.stopped = True
65 log.info('stopped')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070066
67 def start_packet_out_stream(self):
68
69 def packet_generator():
70 while 1:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070071 try:
72 packet = self.packet_out_queue.get(block=True, timeout=1.0)
73 except Empty:
74 if self.stopped:
75 return
76 else:
77 yield packet
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070078
79 def stream_packets_out():
80 generator = packet_generator()
alshabib06b449c2017-01-15 17:33:16 -060081 try:
82 self.local_stub.StreamPacketsOut(generator)
83 except _Rendezvous, e:
84 if e.code() == StatusCode.UNAVAILABLE:
85 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070086
87 reactor.callInThread(stream_packets_out)
88
89 def start_packet_in_stream(self):
90
91 def receive_packet_in_stream():
Zsolt Haraszti66862032016-11-28 14:28:39 -080092 streaming_rpc_method = self.local_stub.ReceivePacketsIn
Zsolt Haraszti9ed54292017-01-09 18:28:32 -080093 iterator = streaming_rpc_method(empty_pb2.Empty())
alshabib06b449c2017-01-15 17:33:16 -060094 try:
95 for packet_in in iterator:
96 reactor.callFromThread(self.packet_in_queue.put,
97 packet_in)
98 log.debug('enqued-packet-in',
99 packet_in=packet_in,
100 queue_len=len(self.packet_in_queue.pending))
101 except _Rendezvous, e:
102 if e.code() == StatusCode.UNAVAILABLE:
103 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700104
105 reactor.callInThread(receive_packet_in_stream)
106
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800107 def start_change_event_in_stream(self):
108
109 def receive_change_events():
110 streaming_rpc_method = self.local_stub.ReceiveChangeEvents
Zsolt Haraszti9ed54292017-01-09 18:28:32 -0800111 iterator = streaming_rpc_method(empty_pb2.Empty())
alshabib06b449c2017-01-15 17:33:16 -0600112 try:
113 for event in iterator:
114 reactor.callFromThread(self.change_event_queue.put, event)
115 log.debug('enqued-change-event',
116 change_event=event,
117 queue_len=len(self.change_event_queue.pending))
118 except _Rendezvous, e:
119 if e.code() == StatusCode.UNAVAILABLE:
120 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800121
122 reactor.callInThread(receive_change_events)
123
124 @inlineCallbacks
125 def change_event_processing_loop(self):
126 while True:
127 try:
128 event = yield self.change_event_queue.get()
129 device_id = event.id
130 self.connection_manager.forward_change_event(device_id, event)
131 except Exception, e:
132 log.exception('failed-in-packet-in-handler', e=e)
133 if self.stopped:
134 break
135
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700136 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700137 def packet_in_forwarder_loop(self):
138 while True:
139 packet_in = yield self.packet_in_queue.get()
140 device_id = packet_in.id
141 ofp_packet_in = packet_in.packet_in
142 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700143 if self.stopped:
144 break
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700145
146 def send_packet_out(self, device_id, packet_out):
147 packet_out = PacketOut(id=device_id, packet_out=packet_out)
148 self.packet_out_queue.put(packet_out)
149
150 @inlineCallbacks
151 def get_port_list(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700152 req = ID(id=device_id)
153 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800154 self.local_stub.ListLogicalDevicePorts, req)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700155 returnValue(res.items)
156
157 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700158 def get_device_info(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700159 req = ID(id=device_id)
160 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800161 self.local_stub.GetLogicalDevice, req)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700162 returnValue(res)
163
164 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700165 def update_flow_table(self, device_id, flow_mod):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700166 req = FlowTableUpdate(
167 id=device_id,
168 flow_mod=flow_mod
169 )
170 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800171 self.local_stub.UpdateLogicalDeviceFlowTable, req)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700172 returnValue(res)
173
174 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700175 def update_group_table(self, device_id, group_mod):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800176 req = FlowGroupTableUpdate(
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700177 id=device_id,
178 group_mod=group_mod
179 )
180 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800181 self.local_stub.UpdateLogicalDeviceFlowGroupTable, req)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700182 returnValue(res)
183
184 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700185 def list_flows(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700186 req = ID(id=device_id)
187 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800188 self.local_stub.ListLogicalDeviceFlows, req)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700189 returnValue(res.items)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700190
191 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700192 def list_groups(self, device_id):
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700193 req = ID(id=device_id)
194 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800195 self.local_stub.ListLogicalDeviceFlowGroups, req)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700196 returnValue(res.items)