blob: efbc038cb956b01409b9a535d30521489e12e502 [file] [log] [blame]
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
The gRPC client layer for the OpenFlow agent
"""
from Queue import Queue
from structlog import get_logger
from twisted.internet import reactor
from twisted.internet import threads
from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
from protos.voltha_pb2 import ID, VolthaLogicalLayerStub, FlowTableUpdate, \
GroupTableUpdate, NullMessage, PacketOut
log = get_logger()
class GrpcClient(object):
def __init__(self, connection_manager, channel):
self.connection_manager = connection_manager
self.channel = channel
self.logical_stub = VolthaLogicalLayerStub(channel)
self.packet_out_queue = Queue() # queue to send out PacketOut msgs
self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
self.start_packet_out_stream()
self.start_packet_in_stream()
reactor.callLater(0, self.packet_in_forwarder_loop)
def start_packet_out_stream(self):
def packet_generator():
while 1:
packet = self.packet_out_queue.get(block=True)
yield packet
def stream_packets_out():
generator = packet_generator()
self.logical_stub.StreamPacketsOut(generator)
reactor.callInThread(stream_packets_out)
def start_packet_in_stream(self):
def receive_packet_in_stream():
for packet_in in self.logical_stub.ReceivePacketsIn(NullMessage()):
reactor.callFromThread(self.packet_in_queue.put, packet_in)
log.debug('enqued-packet-in',
packet_in=packet_in,
queue_len=len(self.packet_in_queue.pending))
reactor.callInThread(receive_packet_in_stream)
@inlineCallbacks
def packet_in_forwarder_loop(self):
while True:
packet_in = yield self.packet_in_queue.get()
device_id = packet_in.id
ofp_packet_in = packet_in.packet_in
self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
def send_packet_out(self, device_id, packet_out):
packet_out = PacketOut(id=device_id, packet_out=packet_out)
self.packet_out_queue.put(packet_out)
@inlineCallbacks
def get_port_list(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
self.logical_stub.ListLogicalDevicePorts, req)
returnValue(res.items)
@inlineCallbacks
def get_device_info(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
self.logical_stub.GetLogicalDevice, req)
returnValue(res)
@inlineCallbacks
def update_flow_table(self, device_id, flow_mod):
req = FlowTableUpdate(
id=device_id,
flow_mod=flow_mod
)
res = yield threads.deferToThread(
self.logical_stub.UpdateFlowTable, req)
returnValue(res)
@inlineCallbacks
def update_group_table(self, device_id, group_mod):
req = GroupTableUpdate(
id=device_id,
group_mod=group_mod
)
res = yield threads.deferToThread(
self.logical_stub.UpdateGroupTable, req)
returnValue(res)
@inlineCallbacks
def list_flows(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
self.logical_stub.ListDeviceFlows, req)
returnValue(res.items)
@inlineCallbacks
def list_groups(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
self.logical_stub.ListDeviceFlowGroups, req)
returnValue(res.items)