VOL-670: Implement OF port_mod handling for logical devices.
Change-Id: I882790f7d70ec812ba6adc0d01e128ed6a98cbbf
diff --git a/ofagent/agent.py b/ofagent/agent.py
index cb59d24..6117c8f 100644
--- a/ofagent/agent.py
+++ b/ofagent/agent.py
@@ -18,17 +18,13 @@
import structlog
import os.path
-from twisted.internet import protocol
-from twisted.internet import reactor
-from twisted.internet import reactor, ssl
-from twisted.internet import reactor
+from twisted.internet import protocol, reactor, ssl
from twisted.internet.defer import Deferred, inlineCallbacks
import loxi.of13 as of13
from common.utils.asleep import asleep
from of_connection import OpenFlowConnection
from of_protocol_handler import OpenFlowProtocolHandler
-# from ofagent.protos.openflow_13_pb2 import ChangeEvent
log = structlog.get_logger()
@@ -208,4 +204,3 @@
reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
reactor.run()
-
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index ccdb98b..1214507 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -28,7 +28,7 @@
from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
from protos.voltha_pb2 import ID, VolthaLocalServiceStub, FlowTableUpdate, \
- FlowGroupTableUpdate, PacketOut
+ FlowGroupTableUpdate, PacketOut, LogicalPortId
from google.protobuf import empty_pb2
@@ -148,6 +148,13 @@
self.packet_out_queue.put(packet_out)
@inlineCallbacks
+ def get_port(self, device_id, port_id):
+ req = LogicalPortId(id=device_id, port_id=port_id)
+ res = yield threads.deferToThread(
+ self.local_stub.GetLogicalDevicePort, req)
+ returnValue(res)
+
+ @inlineCallbacks
def get_port_list(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
@@ -155,6 +162,26 @@
returnValue(res.items)
@inlineCallbacks
+ def enable_port(self, device_id, port_id):
+ req = LogicalPortId(
+ id=device_id,
+ port_id=port_id
+ )
+ res = yield threads.deferToThread(
+ self.local_stub.EnableLogicalDevicePort, req)
+ returnValue(res)
+
+ @inlineCallbacks
+ def disable_port(self, device_id, port_id):
+ req = LogicalPortId(
+ id=device_id,
+ port_id=port_id
+ )
+ res = yield threads.deferToThread(
+ self.local_stub.DisableLogicalDevicePort, req)
+ returnValue(res)
+
+ @inlineCallbacks
def get_device_info(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
diff --git a/ofagent/of_protocol_handler.py b/ofagent/of_protocol_handler.py
index 1780244..d6aa92d 100644
--- a/ofagent/of_protocol_handler.py
+++ b/ofagent/of_protocol_handler.py
@@ -178,8 +178,20 @@
# https://jira.opencord.org/browse/CORD-826
pass
+ @inlineCallbacks
def handle_port_mod_request(self, req):
- raise NotImplementedError()
+ if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+ port = yield self.rpc.get_port(self.device_id, str(req.port_no))
+
+ if port.ofp_port.config & ofp.OFPPC_PORT_DOWN != \
+ req.config & ofp.OFPPC_PORT_DOWN:
+ if req.config & ofp.OFPPC_PORT_DOWN:
+ self.rpc.disable_port(self.device_id, port.id)
+ else:
+ self.rpc.enable_port(self.device_id, port.id)
+
+ elif self.role == ofp.OFPCR_ROLE_SLAVE:
+ self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
def handle_table_mod_request(self, req):
raise NotImplementedError()
diff --git a/voltha/core/global_handler.py b/voltha/core/global_handler.py
index ac5c3aa..bfaaf04 100644
--- a/voltha/core/global_handler.py
+++ b/voltha/core/global_handler.py
@@ -28,7 +28,7 @@
from voltha.protos.voltha_pb2 import \
add_VolthaGlobalServiceServicer_to_server, VolthaLocalServiceStub, \
VolthaGlobalServiceServicer, Voltha, VolthaInstances, VolthaInstance, \
- LogicalDevice, Ports, Flows, FlowGroups, Device, SelfTestResponse, \
+ LogicalDevice, LogicalPort, Ports, Flows, FlowGroups, Device, SelfTestResponse, \
VolthaGlobalServiceStub, Devices, DeviceType, DeviceTypes, DeviceGroup, \
AlarmFilter, AlarmFilters
from voltha.registry import registry
@@ -207,6 +207,26 @@
@twisted_async
@inlineCallbacks
+ def GetLogicalDevicePort(self, request, context):
+ log.info('grpc-request', request=request)
+
+ response = yield self.dispatcher.dispatch('GetLogicalDevicePort',
+ request,
+ context,
+ id=request.id)
+ log.debug('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.warn('grpc-error-response', error=response.error_code)
+ context.set_details(
+ 'Logical port \'{}\' on device \'{}\' error'.format(request.port_id, request.id))
+ context.set_code(response.error_code)
+ returnValue(LogicalPort())
+ else:
+ log.debug('grpc-success-response', response=response)
+ returnValue(response)
+
+ @twisted_async
+ @inlineCallbacks
def ListLogicalDeviceFlows(self, request, context):
log.info('grpc-request', request=request)
response = yield self.dispatcher.dispatch('ListLogicalDeviceFlows',
@@ -226,6 +246,44 @@
@twisted_async
@inlineCallbacks
+ def EnableLogicalDevicePort(self, request, context):
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('EnableLogicalDevicePort',
+ request,
+ context,
+ id=request.id)
+ log.debug('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.warn('grpc-error-response', error=response.error_code)
+ context.set_details(
+ 'Logical device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Empty())
+ else:
+ log.debug('grpc-success-response', response=response)
+ returnValue(response)
+
+ @twisted_async
+ @inlineCallbacks
+ def DisableLogicalDevicePort(self, request, context):
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('DisableLogicalDevicePort',
+ request,
+ context,
+ id=request.id)
+ log.debug('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.warn('grpc-error-response', error=response.error_code)
+ context.set_details(
+ 'Logical device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Empty())
+ else:
+ log.debug('grpc-success-response', response=response)
+ returnValue(response)
+
+ @twisted_async
+ @inlineCallbacks
def UpdateLogicalDeviceFlowTable(self, request, context):
log.info('grpc-request', request=request)
response = yield self.dispatcher.dispatch(
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index 4f5d09c..26016ed 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -29,7 +29,7 @@
from voltha.protos.voltha_pb2 import \
add_VolthaLocalServiceServicer_to_server, VolthaLocalServiceServicer, \
VolthaInstance, Adapters, LogicalDevices, LogicalDevice, Ports, \
- LogicalPorts, Devices, Device, DeviceType, \
+ LogicalPort, LogicalPorts, Devices, Device, DeviceType, \
DeviceTypes, DeviceGroups, DeviceGroup, AdminState, OperStatus, ChangeEvent, \
AlarmFilter, AlarmFilters, SelfTestResponse, OfAgentSubscriber
from voltha.protos.device_pb2 import PmConfigs, Images, ImageDownload, ImageDownloads
@@ -178,6 +178,25 @@
return LogicalPorts()
@twisted_async
+ def GetLogicalDevicePort(self, request, context):
+ log.info('grpc-request', requst=request)
+
+ if '/' in request.id:
+ context.set_details(
+ 'Malformed logical device id \'{}\''.format(request.id))
+ context.set_code(StatusCode.INVALID_ARGUMENT)
+ return LogicalPort()
+
+ try:
+ return self.root.get(
+ '/logical_devices/{}/ports/{}'.format(request.id, request.port_id))
+ except KeyError:
+ context.set_details(
+ 'Logical port \'{}\' not found on device \'{}\''.format(request.port_id, request.id))
+ context.set_code(StatusCode.NOT_FOUND)
+ return LogicalPort()
+
+ @twisted_async
def ListLogicalDeviceFlows(self, request, context):
log.info('grpc-request', request=request)
@@ -198,6 +217,46 @@
return Flows()
@twisted_async
+ def EnableLogicalDevicePort(self, request, context):
+ log.info('grpc-request', request=request)
+
+ if '/' in request.id:
+ context.set_details(
+ 'Malformed logical device id \'{}\''.format(request.id))
+ context.set_code(StatusCode.INVALID_ARGUMENT)
+ return Empty()
+
+ try:
+ agent = self.core.get_logical_device_agent(request.id)
+ agent.port_enable(request.port_id)
+ return Empty()
+ except KeyError:
+ context.set_details(
+ 'Logical device \'{}\' not found'.format(request.id))
+ context.set_code(StatusCode.NOT_FOUND)
+ return Empty()
+
+ @twisted_async
+ def DisableLogicalDevicePort(self, request, context):
+ log.info('grpc-request', request=request)
+
+ if '/' in request.id:
+ context.set_details(
+ 'Malformed logical device id \'{}\''.format(request.id))
+ context.set_code(StatusCode.INVALID_ARGUMENT)
+ return Empty()
+
+ try:
+ agent = self.core.get_logical_device_agent(request.id)
+ agent.port_disable(request.port_id)
+ return Empty()
+ except KeyError:
+ context.set_details(
+ 'Logical device \'{}\' not found'.format(request.id))
+ context.set_code(StatusCode.NOT_FOUND)
+ return Empty()
+
+ @twisted_async
def UpdateLogicalDeviceFlowTable(self, request, context):
log.info('grpc-request', request=request)
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index 5949372..c321da4 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -469,6 +469,22 @@
if changed:
self.groups_proxy.update('/', FlowGroups(items=groups.values()))
+ def port_enable(self, port_id):
+ self.log.info("port-enable", port_id=port_id)
+
+ proxy = self.port_proxy[port_id]
+ port = proxy.get('/')
+ port.ofp_port.config = port.ofp_port.config & ~ofp.OFPPC_PORT_DOWN
+ proxy.update('/', port)
+
+ def port_disable(self, port_id):
+ self.log.info("port-disable", port_id=port_id)
+
+ proxy = self.port_proxy[port_id]
+ port = proxy.get('/')
+ port.ofp_port.config = port.ofp_port.config & ~ofp.OFPPC_PORT_DOWN | ofp.OFPPC_PORT_DOWN
+ proxy.update('/', port)
+
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_OUT ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def packet_out(self, ofp_packet_out):
diff --git a/voltha/protos/logical_device.proto b/voltha/protos/logical_device.proto
index b35fb90..92b2b4f 100644
--- a/voltha/protos/logical_device.proto
+++ b/voltha/protos/logical_device.proto
@@ -8,6 +8,13 @@
import "google/api/annotations.proto";
import "openflow_13.proto";
+message LogicalPortId {
+ // unique id of logical device
+ string id = 1;
+
+ // id of the port on the logical device
+ string port_id = 2;
+}
message LogicalPort {
string id = 1;
diff --git a/voltha/protos/voltha.proto b/voltha/protos/voltha.proto
index 849c95c..7f5a08f 100644
--- a/voltha/protos/voltha.proto
+++ b/voltha/protos/voltha.proto
@@ -53,7 +53,7 @@
message AlarmFilterRuleKey {
option (yang_child_rule) = MOVE_TO_PARENT_LEVEL;
-
+
enum AlarmFilterRuleKey {
id = 0;
type = 1;
@@ -290,6 +290,28 @@
option (voltha.yang_xml_tag).xml_tag = 'ports';
}
+ // Gets a logical device port
+ rpc GetLogicalDevicePort(LogicalPortId) returns(LogicalPort) {
+ option (google.api.http) = {
+ get: "/api/v1/logical_devices/{id}/ports/{port_id}"
+ };
+ option (voltha.yang_xml_tag).xml_tag = 'port';
+ }
+
+ // Enables a logical device port
+ rpc EnableLogicalDevicePort(LogicalPortId) returns(google.protobuf.Empty) {
+ option (google.api.http) = {
+ post: "/api/v1/logical_devices/{id}/ports/{port_id}/enable"
+ };
+ }
+
+ // Disables a logical device port
+ rpc DisableLogicalDevicePort(LogicalPortId) returns(google.protobuf.Empty) {
+ option (google.api.http) = {
+ post: "/api/v1/logical_devices/{id}/ports/{port_id}/disable"
+ };
+ }
+
// List all flows of a logical device
rpc ListLogicalDeviceFlows(ID) returns(openflow_13.Flows) {
option (google.api.http) = {
@@ -350,8 +372,8 @@
}
// Enable a device. If the device was in pre-provisioned state then it
- // will tansition to ENABLED state. If it was is DISABLED state then it
- // will tansition to ENABLED state as well.
+ // will transition to ENABLED state. If it was is DISABLED state then it
+ // will transition to ENABLED state as well.
rpc EnableDevice(ID) returns(google.protobuf.Empty) {
option (google.api.http) = {
post: "/api/v1/devices/{id}/enable"
@@ -983,6 +1005,28 @@
option (voltha.yang_xml_tag).xml_tag = 'ports';
}
+ // Gets a logical device port
+ rpc GetLogicalDevicePort(LogicalPortId) returns(LogicalPort) {
+ option (google.api.http) = {
+ get: "/api/v1/logical_devices/{id}/ports/{port_id}"
+ };
+ option (voltha.yang_xml_tag).xml_tag = 'port';
+ }
+
+ // Enables a logical device port
+ rpc EnableLogicalDevicePort(LogicalPortId) returns(google.protobuf.Empty) {
+ option (google.api.http) = {
+ post: "/api/v1/logical_devices/{id}/ports/{port_id}/enable"
+ };
+ }
+
+ // Disables a logical device port
+ rpc DisableLogicalDevicePort(LogicalPortId) returns(google.protobuf.Empty) {
+ option (google.api.http) = {
+ post: "/api/v1/logical_devices/{id}/ports/{port_id}/disable"
+ };
+ }
+
// List all flows of a logical device
rpc ListLogicalDeviceFlows(ID) returns(openflow_13.Flows) {
option (google.api.http) = {