Draft implementation of DPN P4 communicator
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
index af706e6..7228c6c 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
@@ -16,29 +16,327 @@
package org.onosproject.fpcagent.protocols;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.onlab.packet.Ip4Address;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.criteria.PiCriterion;
+import org.onosproject.net.pi.model.PiActionId;
+import org.onosproject.net.pi.model.PiActionParamId;
+import org.onosproject.net.pi.model.PiMatchFieldId;
+import org.onosproject.net.pi.model.PiTableId;
+import org.onosproject.net.pi.runtime.PiAction;
+import org.onosproject.net.pi.runtime.PiActionParam;
import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.P4DpnControlProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
-public class DpnP4Communicator extends P4DpnControlProtocol implements DpnCommunicationService {
- @Override
- public void create_session(byte topic_id, BigInteger imsi, Short default_ebi, Ip4Address ue_ipv4, Long s1u_sgw_teid, Ip4Address s1u_sgw_ipv4, Long session_id, Long client_id, BigInteger op_id) {
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.ImmutableByteSequence.copyFrom;
+public class DpnP4Communicator
+ extends P4DpnControlProtocol
+ implements DpnCommunicationService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static PiTableId TBL_ID_UE_FILTER = PiTableId
+ .of("spgw_ingress.ue_filter_table");
+ private static PiTableId TBL_ID_S1U_FILTER = PiTableId
+ .of("spgw_ingress.s1u_filter_table");
+ private static PiTableId TBL_ID_DL_SESS_LOOKUP = PiTableId
+ .of("spgw_ingress.dl_sess_lookup");
+
+ private static PiMatchFieldId MF_ID_IPV4_DST = PiMatchFieldId
+ .of("ipv4.dst_addr");
+ private static PiMatchFieldId MF_ID_GTPU_IPV4_DST = PiMatchFieldId
+ .of("gtpu_ipv4.dst_addr");
+
+ private static PiActionId ACT_ID_SET_DL_SESS_INFO = PiActionId
+ .of("spgw_ingress.set_dl_sess_info");
+ private static PiActionParamId ACT_PARAM_ID_DL_TEID = PiActionParamId
+ .of("dl_sess_teid");
+ private static PiActionParamId ACT_PARAM_ID_DL_ENB_ADDR = PiActionParamId
+ .of("dl_sess_enb_addr");
+ private static PiActionParamId ACT_PARAM_ID_DL_S1U_ADDR = PiActionParamId
+ .of("dl_sess_s1u_addr");
+
+ private static PiAction NO_ACTION = PiAction.builder()
+ .withId(PiActionId.of("NoAction"))
+ .build();
+
+ private static byte DIR_UPLINK = (byte) 0;
+ private static byte DIR_DOWNLINK = (byte) 1;
+
+ private static ConcurrentMap<Long, Ip4Address> SESS_ID_TO_UE_ADDR =
+ Maps.newConcurrentMap();
+ private static ConcurrentMap<Long, Ip4Address> SESS_ID_TO_S1U_ADDR =
+ Maps.newConcurrentMap();
+ private static ConcurrentMap<Long, Set<FlowRule>> SESS_ID_TO_FLOWS =
+ Maps.newConcurrentMap();
+
+ // FIXME: should use a cache with timeout
+ private static Map<Long, Lock> SESS_LOCKS = Maps.newConcurrentMap();
+
+ private ApplicationId appId;
+ private DeviceId deviceId;
+ private FlowRuleService flowRuleService;
+ private boolean initialized = false;
+
+ public void init(ApplicationId appId, DeviceId deviceId, FlowRuleService flowRuleService) {
+ this.appId = checkNotNull(appId);
+ this.deviceId = checkNotNull(deviceId);
+ this.flowRuleService = checkNotNull(flowRuleService);
+ this.initialized = true;
+ }
+
+ private boolean isNotInit() {
+ if (!this.initialized) {
+ log.error("Not initialized. Cannot perform operations.");
+ return true;
+ } else {
+ return false;
+ }
}
@Override
- public void modify_bearer(byte topic_id, Ip4Address s1u_sgw_ipv4, Long s1u_enodeb_teid, Ip4Address s1u_enodeb_ipv4, Long session_id, Long client_id, BigInteger op_id) {
+ public void create_session(byte topic_id, BigInteger imsi, Short default_ebi,
+ Ip4Address ue_ipv4, Long s1u_sgw_teid,
+ Ip4Address s1u_sgw_ipv4, Long session_id,
+ Long client_id, BigInteger op_id) {
+ log.info("create_session(topic_id={}, imsi={}, default_ebi={}, " +
+ "ue_ipv4={}, s1u_sgw_teid={}, s1u_sgw_ipv4={}, " +
+ "session_id={}, client_id={}, op_id={})",
+ topic_id, imsi, default_ebi, ue_ipv4, s1u_sgw_teid,
+ s1u_sgw_ipv4, session_id, client_id, op_id);
+
+ if (isNotInit()) {
+ return;
+ }
+
+ SESS_LOCKS.putIfAbsent(session_id, new ReentrantLock());
+ SESS_LOCKS.get(session_id).lock();
+ try {
+ if (SESS_ID_TO_FLOWS.containsKey(session_id)
+ && !SESS_ID_TO_FLOWS.get(session_id).isEmpty()) {
+ log.warn("Creating session {}, but {} rules already exists for such session.",
+ session_id, SESS_ID_TO_FLOWS.get(session_id).size());
+ SESS_ID_TO_FLOWS.get(session_id).forEach(f -> log.debug("{}", f));
+ }
+
+ // If old session is there. We keep the old rules.
+ SESS_ID_TO_FLOWS.putIfAbsent(session_id, Sets.newHashSet());
+ SESS_ID_TO_UE_ADDR.put(session_id, ue_ipv4);
+ SESS_ID_TO_S1U_ADDR.put(session_id, s1u_sgw_ipv4);
+
+ SESS_ID_TO_FLOWS.get(session_id).add(ueFilterRule(ue_ipv4));
+ SESS_ID_TO_FLOWS.get(session_id).add(s1uFilterRule(s1u_sgw_ipv4));
+
+ applySessionRules(session_id);
+ } finally {
+ SESS_LOCKS.get(session_id).unlock();
+ }
}
@Override
- public void delete_session(byte topic_id, Long session_id, Long client_id, BigInteger op_id) {
+ public void modify_bearer(byte topic_id, Ip4Address s1u_sgw_ipv4,
+ Long s1u_enodeb_teid, Ip4Address s1u_enodeb_ipv4,
+ Long session_id, Long client_id, BigInteger op_id) {
+ log.info("modify_bearer(topic_id={}, s1u_sgw_ipv4={}, " +
+ "s1u_enodeb_teid={}, s1u_enodeb_ipv4={}, " +
+ "session_id={}, client_id={}, op_id={})",
+ topic_id, s1u_sgw_ipv4, s1u_enodeb_teid, s1u_enodeb_ipv4,
+ session_id, client_id, op_id);
+
+ if (isNotInit()) {
+ return;
+ }
+
+ SESS_LOCKS.putIfAbsent(session_id, new ReentrantLock());
+ SESS_LOCKS.get(session_id).lock();
+ try {
+ if (!SESS_ID_TO_UE_ADDR.containsKey(session_id)) {
+ log.error("Missing sess ID in SESS_ID_TO_UE_ADDR map: {}",
+ session_id);
+ return;
+ }
+ if (!SESS_ID_TO_S1U_ADDR.containsKey(session_id)) {
+ log.error("Missing sess ID in SESS_ID_TO_S1U_ADDR map: {}",
+ session_id);
+ return;
+ }
+
+ final Ip4Address ueAddr = SESS_ID_TO_UE_ADDR.get(session_id);
+ final Ip4Address s1uAddr = SESS_ID_TO_S1U_ADDR.get(session_id);
+
+ SESS_ID_TO_FLOWS.get(session_id)
+ .add(dlSessLookupRule(ueAddr, s1u_enodeb_teid,
+ s1u_enodeb_ipv4, s1uAddr));
+
+ applySessionRules(session_id);
+ } finally {
+ SESS_LOCKS.get(session_id).unlock();
+ }
}
@Override
- public void send_ADC_rules(Short topic, String domain_name, String ip, Short drop, Long rating_group, Long service_ID, String sponsor_ID) {
+ public void delete_session(byte topic_id, Long session_id, Long client_id,
+ BigInteger op_id) {
+ log.info("delete_session(topic_id={}, session_id={}, client_id={}, " +
+ "op_id={})",
+ topic_id, session_id, client_id, op_id);
+
+ if (isNotInit()) {
+ return;
+ }
+
+ SESS_LOCKS.putIfAbsent(session_id, new ReentrantLock());
+ SESS_LOCKS.get(session_id).lock();
+ try {
+ SESS_ID_TO_S1U_ADDR.remove(session_id);
+ SESS_ID_TO_UE_ADDR.remove(session_id);
+
+ if (!SESS_ID_TO_FLOWS.containsKey(session_id)
+ || SESS_ID_TO_FLOWS.get(session_id).isEmpty()) {
+ log.warn("Deleting session {}, but no rules exist for this session",
+ session_id);
+ } else {
+ removeSessionRules(session_id);
+ }
+ } finally {
+ SESS_LOCKS.get(session_id).unlock();
+ }
+ }
+
+ @Override
+ public void send_ADC_rules(Short topic, String domain_name, String ip,
+ Short drop, Long rating_group, Long service_ID,
+ String sponsor_ID) {
+
+ log.info("send_ADC_rules(topic={}, domain_name={}, ip={}, drop={}, " +
+ "rating_group={}, service_ID={}, sponsor_ID={})",
+ topic, domain_name, ip, drop, rating_group, service_ID,
+ sponsor_ID);
+
+ if (isNotInit()) {
+ return;
+ }
+
+ log.warn("send_ADC_rules() UNIMPLEMENTED!");
+ }
+
+ private FlowRule ueFilterRule(Ip4Address ueAddr) {
+ final PiCriterion piCriterion = PiCriterion.builder()
+ .matchExact(MF_ID_IPV4_DST, ueAddr.toOctets())
+ .build();
+
+ return flowRuleBuilder()
+ .forTable(TBL_ID_UE_FILTER)
+ .withSelector(DefaultTrafficSelector.builder()
+ .matchPi(piCriterion)
+ .build())
+ .withTreatment(DefaultTrafficTreatment.builder()
+ .piTableAction(NO_ACTION)
+ .build())
+ .build();
+ }
+
+ private FlowRule s1uFilterRule(Ip4Address s1uAddr) {
+ final PiCriterion piCriterion = PiCriterion.builder()
+ .matchExact(MF_ID_GTPU_IPV4_DST, s1uAddr.toOctets())
+ .build();
+
+ return flowRuleBuilder()
+ .forTable(TBL_ID_S1U_FILTER)
+ .withSelector(DefaultTrafficSelector.builder()
+ .matchPi(piCriterion)
+ .build())
+ .withTreatment(DefaultTrafficTreatment.builder()
+ .piTableAction(NO_ACTION)
+ .build())
+ .build();
+ }
+
+ private FlowRule dlSessLookupRule(Ip4Address ueAddr, long teid,
+ Ip4Address enbAddr, Ip4Address s1uAddr) {
+ // Add rule to
+ final PiAction action = PiAction.builder()
+ .withId(ACT_ID_SET_DL_SESS_INFO)
+ .withParameter(new PiActionParam(ACT_PARAM_ID_DL_TEID,
+ copyFrom(teid)))
+ .withParameter(new PiActionParam(ACT_PARAM_ID_DL_ENB_ADDR,
+ copyFrom(enbAddr.toOctets())))
+ .withParameter(new PiActionParam(ACT_PARAM_ID_DL_S1U_ADDR,
+ copyFrom(s1uAddr.toOctets())))
+ .build();
+
+ final PiCriterion piCriterion = PiCriterion.builder()
+ .matchExact(MF_ID_IPV4_DST, ueAddr.toOctets())
+ .build();
+
+ return flowRuleBuilder()
+ .forTable(TBL_ID_DL_SESS_LOOKUP)
+ .withSelector(DefaultTrafficSelector.builder()
+ .matchPi(piCriterion)
+ .build())
+ .withTreatment(DefaultTrafficTreatment.builder()
+ .piTableAction(action)
+ .build())
+ .build();
+ }
+
+ private FlowRule ueCdrRule(Ip4Address ueAddr) {
+ // FIXME: should we count on downlink, uplink or both?
+ return null;
+ }
+
+ private FlowRule.Builder flowRuleBuilder() {
+ return DefaultFlowRule.builder()
+ .forDevice(deviceId)
+ .withPriority(0)
+ .makePermanent()
+ .fromApp(appId);
+ }
+
+ private void applySessionRules(long sessionId) {
+ if (SESS_ID_TO_FLOWS.containsKey(sessionId)) {
+ batchApply(SESS_ID_TO_FLOWS.get(sessionId));
+ }
+ }
+
+ private void removeSessionRules(long sessionId) {
+ if (SESS_ID_TO_FLOWS.containsKey(sessionId)) {
+ batchRemove(SESS_ID_TO_FLOWS.get(sessionId));
+ }
+ }
+
+ private void batchApply(Collection<FlowRule> rules) {
+ final FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
+ rules.forEach(opsBuilder::add);
+ flowRuleService.apply(opsBuilder.build());
+ }
+
+ private void batchRemove(Collection<FlowRule> rules) {
+ final FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
+ rules.forEach(opsBuilder::remove);
+ flowRuleService.apply(opsBuilder.build());
}
}