blob: a4475b559a2f5e253b148e0f220df46697ba982b [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070020from Queue import Queue, Empty
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070021
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070022from grpc import StatusCode
23from grpc._channel import _Rendezvous
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070024from structlog import get_logger
25from twisted.internet import reactor
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070026from twisted.internet import threads
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070027from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070028
Zsolt Haraszti8a774382016-10-24 18:25:54 -070029from protos.voltha_pb2 import ID, VolthaLogicalLayerStub, FlowTableUpdate, \
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080030 GroupTableUpdate, PacketOut
31from google.protobuf import empty_pb2
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070032
33
34log = get_logger()
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070035
36
37class GrpcClient(object):
38
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070039 def __init__(self, connection_manager, channel):
40
41 self.connection_manager = connection_manager
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070042 self.channel = channel
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070043 self.logical_stub = VolthaLogicalLayerStub(channel)
44
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070045 self.stopped = False
46
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070047 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
48 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070049
50 def start(self):
51 log.debug('starting')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070052 self.start_packet_out_stream()
53 self.start_packet_in_stream()
54 reactor.callLater(0, self.packet_in_forwarder_loop)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070055 log.info('started')
56 return self
57
58 def stop(self):
59 log.debug('stopping')
60 self.stopped = True
61 log.info('stopped')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070062
63 def start_packet_out_stream(self):
64
65 def packet_generator():
66 while 1:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070067 try:
68 packet = self.packet_out_queue.get(block=True, timeout=1.0)
69 except Empty:
70 if self.stopped:
71 return
72 else:
73 yield packet
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070074
75 def stream_packets_out():
76 generator = packet_generator()
77 self.logical_stub.StreamPacketsOut(generator)
78
79 reactor.callInThread(stream_packets_out)
80
81 def start_packet_in_stream(self):
82
83 def receive_packet_in_stream():
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070084 streaming_rpc_method = self.logical_stub.ReceivePacketsIn
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080085 iterator = streaming_rpc_method(empty_pb2.Empty())
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070086 for packet_in in iterator:
87 reactor.callFromThread(self.packet_in_queue.put,
88 packet_in)
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070089 log.debug('enqued-packet-in',
90 packet_in=packet_in,
91 queue_len=len(self.packet_in_queue.pending))
92
93 reactor.callInThread(receive_packet_in_stream)
94
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070095 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070096 def packet_in_forwarder_loop(self):
97 while True:
98 packet_in = yield self.packet_in_queue.get()
99 device_id = packet_in.id
100 ofp_packet_in = packet_in.packet_in
101 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700102 if self.stopped:
103 break
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700104
105 def send_packet_out(self, device_id, packet_out):
106 packet_out = PacketOut(id=device_id, packet_out=packet_out)
107 self.packet_out_queue.put(packet_out)
108
109 @inlineCallbacks
110 def get_port_list(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700111 req = ID(id=device_id)
112 res = yield threads.deferToThread(
113 self.logical_stub.ListLogicalDevicePorts, req)
114 returnValue(res.items)
115
116 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700117 def get_device_info(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700118 req = ID(id=device_id)
119 res = yield threads.deferToThread(
120 self.logical_stub.GetLogicalDevice, req)
121 returnValue(res)
122
123 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700124 def update_flow_table(self, device_id, flow_mod):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700125 req = FlowTableUpdate(
126 id=device_id,
127 flow_mod=flow_mod
128 )
129 res = yield threads.deferToThread(
130 self.logical_stub.UpdateFlowTable, req)
131 returnValue(res)
132
133 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700134 def update_group_table(self, device_id, group_mod):
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700135 req = GroupTableUpdate(
136 id=device_id,
137 group_mod=group_mod
138 )
139 res = yield threads.deferToThread(
140 self.logical_stub.UpdateGroupTable, req)
141 returnValue(res)
142
143 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700144 def list_flows(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700145 req = ID(id=device_id)
146 res = yield threads.deferToThread(
147 self.logical_stub.ListDeviceFlows, req)
148 returnValue(res.items)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700149
150 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700151 def list_groups(self, device_id):
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700152 req = ID(id=device_id)
153 res = yield threads.deferToThread(
154 self.logical_stub.ListDeviceFlowGroups, req)
155 returnValue(res.items)