blob: 75c48ce9508ff88920be9795c1fad657e95232d3 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070020from Queue import Queue, Empty
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070021
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070022from grpc import StatusCode
23from grpc._channel import _Rendezvous
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070024from structlog import get_logger
25from twisted.internet import reactor
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070026from twisted.internet import threads
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070027from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070028
Zsolt Haraszti66862032016-11-28 14:28:39 -080029from protos.voltha_pb2 import ID, VolthaLocalServiceStub, FlowTableUpdate, \
30 FlowGroupTableUpdate, PacketOut
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080031from google.protobuf import empty_pb2
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070032
33
34log = get_logger()
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070035
36
37class GrpcClient(object):
38
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070039 def __init__(self, connection_manager, channel):
40
41 self.connection_manager = connection_manager
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070042 self.channel = channel
Zsolt Haraszti66862032016-11-28 14:28:39 -080043 self.local_stub = VolthaLocalServiceStub(channel)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070044
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070045 self.stopped = False
46
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070047 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
48 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080049 self.change_event_queue = DeferredQueue() # queue change events
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070050
51 def start(self):
52 log.debug('starting')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070053 self.start_packet_out_stream()
54 self.start_packet_in_stream()
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080055 self.start_change_event_in_stream()
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070056 reactor.callLater(0, self.packet_in_forwarder_loop)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080057 reactor.callLater(0, self.change_event_processing_loop)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070058 log.info('started')
59 return self
60
61 def stop(self):
62 log.debug('stopping')
63 self.stopped = True
64 log.info('stopped')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070065
66 def start_packet_out_stream(self):
67
68 def packet_generator():
69 while 1:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070070 try:
71 packet = self.packet_out_queue.get(block=True, timeout=1.0)
72 except Empty:
73 if self.stopped:
74 return
75 else:
76 yield packet
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070077
78 def stream_packets_out():
79 generator = packet_generator()
Zsolt Haraszti66862032016-11-28 14:28:39 -080080 self.local_stub.StreamPacketsOut(generator)
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070081
82 reactor.callInThread(stream_packets_out)
83
84 def start_packet_in_stream(self):
85
86 def receive_packet_in_stream():
Zsolt Haraszti66862032016-11-28 14:28:39 -080087 streaming_rpc_method = self.local_stub.ReceivePacketsIn
Zsolt Haraszti3300f742017-01-09 01:14:20 -080088 iterator = streaming_rpc_method(empty_pb2.Empty(), timeout=1.0)
89 while not self.stopped:
90 try:
91 for packet_in in iterator:
92 reactor.callFromThread(self.packet_in_queue.put,
93 packet_in)
94 log.debug('enqued-packet-in',
95 packet_in=packet_in,
96 queue_len=len(self.packet_in_queue.pending))
97 except _Rendezvous, e:
98 if e.code() == StatusCode.DEADLINE_EXCEEDED:
99 continue
100 raise
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700101
102 reactor.callInThread(receive_packet_in_stream)
103
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800104 def start_change_event_in_stream(self):
105
106 def receive_change_events():
107 streaming_rpc_method = self.local_stub.ReceiveChangeEvents
Zsolt Haraszti3300f742017-01-09 01:14:20 -0800108 iterator = streaming_rpc_method(empty_pb2.Empty(), timeout=1.0)
109 while not self.stopped:
110 try:
111 for event in iterator:
112 reactor.callFromThread(self.change_event_queue.put, event)
113 log.debug('enqued-change-event',
114 change_event=event,
115 queue_len=len(self.change_event_queue.pending))
116 except _Rendezvous, e:
117 if e.code() == StatusCode.DEADLINE_EXCEEDED:
118 continue
119 raise
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800120
121 reactor.callInThread(receive_change_events)
122
123 @inlineCallbacks
124 def change_event_processing_loop(self):
125 while True:
126 try:
127 event = yield self.change_event_queue.get()
128 device_id = event.id
129 self.connection_manager.forward_change_event(device_id, event)
130 except Exception, e:
131 log.exception('failed-in-packet-in-handler', e=e)
132 if self.stopped:
133 break
134
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700135 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700136 def packet_in_forwarder_loop(self):
137 while True:
138 packet_in = yield self.packet_in_queue.get()
139 device_id = packet_in.id
140 ofp_packet_in = packet_in.packet_in
141 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700142 if self.stopped:
143 break
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700144
145 def send_packet_out(self, device_id, packet_out):
146 packet_out = PacketOut(id=device_id, packet_out=packet_out)
147 self.packet_out_queue.put(packet_out)
148
149 @inlineCallbacks
150 def get_port_list(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700151 req = ID(id=device_id)
152 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800153 self.local_stub.ListLogicalDevicePorts, req)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700154 returnValue(res.items)
155
156 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700157 def get_device_info(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700158 req = ID(id=device_id)
159 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800160 self.local_stub.GetLogicalDevice, req)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700161 returnValue(res)
162
163 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700164 def update_flow_table(self, device_id, flow_mod):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700165 req = FlowTableUpdate(
166 id=device_id,
167 flow_mod=flow_mod
168 )
169 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800170 self.local_stub.UpdateLogicalDeviceFlowTable, req)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700171 returnValue(res)
172
173 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700174 def update_group_table(self, device_id, group_mod):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800175 req = FlowGroupTableUpdate(
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700176 id=device_id,
177 group_mod=group_mod
178 )
179 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800180 self.local_stub.UpdateLogicalDeviceFlowGroupTable, req)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700181 returnValue(res)
182
183 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700184 def list_flows(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700185 req = ID(id=device_id)
186 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800187 self.local_stub.ListLogicalDeviceFlows, req)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700188 returnValue(res.items)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700189
190 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700191 def list_groups(self, device_id):
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700192 req = ID(id=device_id)
193 res = yield threads.deferToThread(
Zsolt Haraszti66862032016-11-28 14:28:39 -0800194 self.local_stub.ListLogicalDeviceFlowGroups, req)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700195 returnValue(res.items)