blob: efbc038cb956b01409b9a535d30521489e12e502 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070020from Queue import Queue
21
22from structlog import get_logger
23from twisted.internet import reactor
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070024from twisted.internet import threads
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070025from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070026
Zsolt Haraszti8a774382016-10-24 18:25:54 -070027from protos.voltha_pb2 import ID, VolthaLogicalLayerStub, FlowTableUpdate, \
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070028 GroupTableUpdate, NullMessage, PacketOut
29
30
31log = get_logger()
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070032
33
34class GrpcClient(object):
35
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070036 def __init__(self, connection_manager, channel):
37
38 self.connection_manager = connection_manager
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070039 self.channel = channel
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070040 self.logical_stub = VolthaLogicalLayerStub(channel)
41
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070042 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
43 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
44 self.start_packet_out_stream()
45 self.start_packet_in_stream()
46 reactor.callLater(0, self.packet_in_forwarder_loop)
47
48 def start_packet_out_stream(self):
49
50 def packet_generator():
51 while 1:
52 packet = self.packet_out_queue.get(block=True)
53 yield packet
54
55 def stream_packets_out():
56 generator = packet_generator()
57 self.logical_stub.StreamPacketsOut(generator)
58
59 reactor.callInThread(stream_packets_out)
60
61 def start_packet_in_stream(self):
62
63 def receive_packet_in_stream():
64 for packet_in in self.logical_stub.ReceivePacketsIn(NullMessage()):
65 reactor.callFromThread(self.packet_in_queue.put, packet_in)
66 log.debug('enqued-packet-in',
67 packet_in=packet_in,
68 queue_len=len(self.packet_in_queue.pending))
69
70 reactor.callInThread(receive_packet_in_stream)
71
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070072 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070073 def packet_in_forwarder_loop(self):
74 while True:
75 packet_in = yield self.packet_in_queue.get()
76 device_id = packet_in.id
77 ofp_packet_in = packet_in.packet_in
78 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
79
80 def send_packet_out(self, device_id, packet_out):
81 packet_out = PacketOut(id=device_id, packet_out=packet_out)
82 self.packet_out_queue.put(packet_out)
83
84 @inlineCallbacks
85 def get_port_list(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070086 req = ID(id=device_id)
87 res = yield threads.deferToThread(
88 self.logical_stub.ListLogicalDevicePorts, req)
89 returnValue(res.items)
90
91 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070092 def get_device_info(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070093 req = ID(id=device_id)
94 res = yield threads.deferToThread(
95 self.logical_stub.GetLogicalDevice, req)
96 returnValue(res)
97
98 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070099 def update_flow_table(self, device_id, flow_mod):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700100 req = FlowTableUpdate(
101 id=device_id,
102 flow_mod=flow_mod
103 )
104 res = yield threads.deferToThread(
105 self.logical_stub.UpdateFlowTable, req)
106 returnValue(res)
107
108 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700109 def update_group_table(self, device_id, group_mod):
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700110 req = GroupTableUpdate(
111 id=device_id,
112 group_mod=group_mod
113 )
114 res = yield threads.deferToThread(
115 self.logical_stub.UpdateGroupTable, req)
116 returnValue(res)
117
118 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700119 def list_flows(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700120 req = ID(id=device_id)
121 res = yield threads.deferToThread(
122 self.logical_stub.ListDeviceFlows, req)
123 returnValue(res.items)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700124
125 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700126 def list_groups(self, device_id):
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700127 req = ID(id=device_id)
128 res = yield threads.deferToThread(
129 self.logical_stub.ListDeviceFlowGroups, req)
130 returnValue(res.items)