Draft implementation of DPN P4 communicator
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
index af706e6..7228c6c 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/protocols/DpnP4Communicator.java
@@ -16,29 +16,327 @@
 
 package org.onosproject.fpcagent.protocols;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.onlab.packet.Ip4Address;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.criteria.PiCriterion;
+import org.onosproject.net.pi.model.PiActionId;
+import org.onosproject.net.pi.model.PiActionParamId;
+import org.onosproject.net.pi.model.PiMatchFieldId;
+import org.onosproject.net.pi.model.PiTableId;
+import org.onosproject.net.pi.runtime.PiAction;
+import org.onosproject.net.pi.runtime.PiActionParam;
 import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.P4DpnControlProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
-public class DpnP4Communicator extends P4DpnControlProtocol implements DpnCommunicationService {
-    @Override
-    public void create_session(byte topic_id, BigInteger imsi, Short default_ebi, Ip4Address ue_ipv4, Long s1u_sgw_teid, Ip4Address s1u_sgw_ipv4, Long session_id, Long client_id, BigInteger op_id) {
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.ImmutableByteSequence.copyFrom;
 
+public class DpnP4Communicator
+        extends P4DpnControlProtocol
+        implements DpnCommunicationService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private static PiTableId TBL_ID_UE_FILTER = PiTableId
+            .of("spgw_ingress.ue_filter_table");
+    private static PiTableId TBL_ID_S1U_FILTER = PiTableId
+            .of("spgw_ingress.s1u_filter_table");
+    private static PiTableId TBL_ID_DL_SESS_LOOKUP = PiTableId
+            .of("spgw_ingress.dl_sess_lookup");
+
+    private static PiMatchFieldId MF_ID_IPV4_DST = PiMatchFieldId
+            .of("ipv4.dst_addr");
+    private static PiMatchFieldId MF_ID_GTPU_IPV4_DST = PiMatchFieldId
+            .of("gtpu_ipv4.dst_addr");
+
+    private static PiActionId ACT_ID_SET_DL_SESS_INFO = PiActionId
+            .of("spgw_ingress.set_dl_sess_info");
+    private static PiActionParamId ACT_PARAM_ID_DL_TEID = PiActionParamId
+            .of("dl_sess_teid");
+    private static PiActionParamId ACT_PARAM_ID_DL_ENB_ADDR = PiActionParamId
+            .of("dl_sess_enb_addr");
+    private static PiActionParamId ACT_PARAM_ID_DL_S1U_ADDR = PiActionParamId
+            .of("dl_sess_s1u_addr");
+
+    private static PiAction NO_ACTION = PiAction.builder()
+            .withId(PiActionId.of("NoAction"))
+            .build();
+
+    private static byte DIR_UPLINK = (byte) 0;
+    private static byte DIR_DOWNLINK = (byte) 1;
+
+    private static ConcurrentMap<Long, Ip4Address> SESS_ID_TO_UE_ADDR =
+            Maps.newConcurrentMap();
+    private static ConcurrentMap<Long, Ip4Address> SESS_ID_TO_S1U_ADDR =
+            Maps.newConcurrentMap();
+    private static ConcurrentMap<Long, Set<FlowRule>> SESS_ID_TO_FLOWS =
+            Maps.newConcurrentMap();
+
+    // FIXME: should use a cache with timeout
+    private static Map<Long, Lock> SESS_LOCKS = Maps.newConcurrentMap();
+
+    private ApplicationId appId;
+    private DeviceId deviceId;
+    private FlowRuleService flowRuleService;
+    private boolean initialized = false;
+
+    public void init(ApplicationId appId, DeviceId deviceId, FlowRuleService flowRuleService) {
+        this.appId = checkNotNull(appId);
+        this.deviceId = checkNotNull(deviceId);
+        this.flowRuleService = checkNotNull(flowRuleService);
+        this.initialized = true;
+    }
+
+    private boolean isNotInit() {
+        if (!this.initialized) {
+            log.error("Not initialized. Cannot perform operations.");
+            return true;
+        } else {
+            return false;
+        }
     }
 
     @Override
-    public void modify_bearer(byte topic_id, Ip4Address s1u_sgw_ipv4, Long s1u_enodeb_teid, Ip4Address s1u_enodeb_ipv4, Long session_id, Long client_id, BigInteger op_id) {
+    public void create_session(byte topic_id, BigInteger imsi, Short default_ebi,
+                               Ip4Address ue_ipv4, Long s1u_sgw_teid,
+                               Ip4Address s1u_sgw_ipv4, Long session_id,
+                               Long client_id, BigInteger op_id) {
 
+        log.info("create_session(topic_id={}, imsi={}, default_ebi={}, " +
+                         "ue_ipv4={}, s1u_sgw_teid={}, s1u_sgw_ipv4={}, " +
+                         "session_id={}, client_id={}, op_id={})",
+                 topic_id, imsi, default_ebi, ue_ipv4, s1u_sgw_teid,
+                 s1u_sgw_ipv4, session_id, client_id, op_id);
+
+        if (isNotInit()) {
+            return;
+        }
+
+        SESS_LOCKS.putIfAbsent(session_id, new ReentrantLock());
+        SESS_LOCKS.get(session_id).lock();
+        try {
+            if (SESS_ID_TO_FLOWS.containsKey(session_id)
+                    && !SESS_ID_TO_FLOWS.get(session_id).isEmpty()) {
+                log.warn("Creating session {}, but {} rules already exists for such session.",
+                         session_id, SESS_ID_TO_FLOWS.get(session_id).size());
+                SESS_ID_TO_FLOWS.get(session_id).forEach(f -> log.debug("{}", f));
+            }
+
+            // If old session is there. We keep the old rules.
+            SESS_ID_TO_FLOWS.putIfAbsent(session_id, Sets.newHashSet());
+            SESS_ID_TO_UE_ADDR.put(session_id, ue_ipv4);
+            SESS_ID_TO_S1U_ADDR.put(session_id, s1u_sgw_ipv4);
+
+            SESS_ID_TO_FLOWS.get(session_id).add(ueFilterRule(ue_ipv4));
+            SESS_ID_TO_FLOWS.get(session_id).add(s1uFilterRule(s1u_sgw_ipv4));
+
+            applySessionRules(session_id);
+        } finally {
+            SESS_LOCKS.get(session_id).unlock();
+        }
     }
 
     @Override
-    public void delete_session(byte topic_id, Long session_id, Long client_id, BigInteger op_id) {
+    public void modify_bearer(byte topic_id, Ip4Address s1u_sgw_ipv4,
+                              Long s1u_enodeb_teid, Ip4Address s1u_enodeb_ipv4,
+                              Long session_id, Long client_id, BigInteger op_id) {
 
+        log.info("modify_bearer(topic_id={}, s1u_sgw_ipv4={}, " +
+                         "s1u_enodeb_teid={}, s1u_enodeb_ipv4={}, " +
+                         "session_id={}, client_id={}, op_id={})",
+                 topic_id, s1u_sgw_ipv4, s1u_enodeb_teid, s1u_enodeb_ipv4,
+                 session_id, client_id, op_id);
+
+        if (isNotInit()) {
+            return;
+        }
+
+        SESS_LOCKS.putIfAbsent(session_id, new ReentrantLock());
+        SESS_LOCKS.get(session_id).lock();
+        try {
+            if (!SESS_ID_TO_UE_ADDR.containsKey(session_id)) {
+                log.error("Missing sess ID in SESS_ID_TO_UE_ADDR map: {}",
+                          session_id);
+                return;
+            }
+            if (!SESS_ID_TO_S1U_ADDR.containsKey(session_id)) {
+                log.error("Missing sess ID in SESS_ID_TO_S1U_ADDR map: {}",
+                          session_id);
+                return;
+            }
+
+            final Ip4Address ueAddr = SESS_ID_TO_UE_ADDR.get(session_id);
+            final Ip4Address s1uAddr = SESS_ID_TO_S1U_ADDR.get(session_id);
+
+            SESS_ID_TO_FLOWS.get(session_id)
+                    .add(dlSessLookupRule(ueAddr, s1u_enodeb_teid,
+                                          s1u_enodeb_ipv4, s1uAddr));
+
+            applySessionRules(session_id);
+        } finally {
+            SESS_LOCKS.get(session_id).unlock();
+        }
     }
 
     @Override
-    public void send_ADC_rules(Short topic, String domain_name, String ip, Short drop, Long rating_group, Long service_ID, String sponsor_ID) {
+    public void delete_session(byte topic_id, Long session_id, Long client_id,
+                               BigInteger op_id) {
 
+        log.info("delete_session(topic_id={}, session_id={}, client_id={}, " +
+                         "op_id={})",
+                 topic_id, session_id, client_id, op_id);
+
+        if (isNotInit()) {
+            return;
+        }
+
+        SESS_LOCKS.putIfAbsent(session_id, new ReentrantLock());
+        SESS_LOCKS.get(session_id).lock();
+        try {
+            SESS_ID_TO_S1U_ADDR.remove(session_id);
+            SESS_ID_TO_UE_ADDR.remove(session_id);
+
+            if (!SESS_ID_TO_FLOWS.containsKey(session_id)
+                    || SESS_ID_TO_FLOWS.get(session_id).isEmpty()) {
+                log.warn("Deleting session {}, but no rules exist for this session",
+                         session_id);
+            } else {
+                removeSessionRules(session_id);
+            }
+        } finally {
+            SESS_LOCKS.get(session_id).unlock();
+        }
+    }
+
+    @Override
+    public void send_ADC_rules(Short topic, String domain_name, String ip,
+                               Short drop, Long rating_group, Long service_ID,
+                               String sponsor_ID) {
+
+        log.info("send_ADC_rules(topic={}, domain_name={}, ip={}, drop={}, " +
+                         "rating_group={}, service_ID={}, sponsor_ID={})",
+                 topic, domain_name, ip, drop, rating_group, service_ID,
+                 sponsor_ID);
+
+        if (isNotInit()) {
+            return;
+        }
+
+        log.warn("send_ADC_rules() UNIMPLEMENTED!");
+    }
+
+    private FlowRule ueFilterRule(Ip4Address ueAddr) {
+        final PiCriterion piCriterion = PiCriterion.builder()
+                .matchExact(MF_ID_IPV4_DST, ueAddr.toOctets())
+                .build();
+
+        return flowRuleBuilder()
+                .forTable(TBL_ID_UE_FILTER)
+                .withSelector(DefaultTrafficSelector.builder()
+                                      .matchPi(piCriterion)
+                                      .build())
+                .withTreatment(DefaultTrafficTreatment.builder()
+                                       .piTableAction(NO_ACTION)
+                                       .build())
+                .build();
+    }
+
+    private FlowRule s1uFilterRule(Ip4Address s1uAddr) {
+        final PiCriterion piCriterion = PiCriterion.builder()
+                .matchExact(MF_ID_GTPU_IPV4_DST, s1uAddr.toOctets())
+                .build();
+
+        return flowRuleBuilder()
+                .forTable(TBL_ID_S1U_FILTER)
+                .withSelector(DefaultTrafficSelector.builder()
+                                      .matchPi(piCriterion)
+                                      .build())
+                .withTreatment(DefaultTrafficTreatment.builder()
+                                       .piTableAction(NO_ACTION)
+                                       .build())
+                .build();
+    }
+
+    private FlowRule dlSessLookupRule(Ip4Address ueAddr, long teid,
+                                      Ip4Address enbAddr, Ip4Address s1uAddr) {
+        // Add rule to
+        final PiAction action = PiAction.builder()
+                .withId(ACT_ID_SET_DL_SESS_INFO)
+                .withParameter(new PiActionParam(ACT_PARAM_ID_DL_TEID,
+                                                 copyFrom(teid)))
+                .withParameter(new PiActionParam(ACT_PARAM_ID_DL_ENB_ADDR,
+                                                 copyFrom(enbAddr.toOctets())))
+                .withParameter(new PiActionParam(ACT_PARAM_ID_DL_S1U_ADDR,
+                                                 copyFrom(s1uAddr.toOctets())))
+                .build();
+
+        final PiCriterion piCriterion = PiCriterion.builder()
+                .matchExact(MF_ID_IPV4_DST, ueAddr.toOctets())
+                .build();
+
+        return flowRuleBuilder()
+                .forTable(TBL_ID_DL_SESS_LOOKUP)
+                .withSelector(DefaultTrafficSelector.builder()
+                                      .matchPi(piCriterion)
+                                      .build())
+                .withTreatment(DefaultTrafficTreatment.builder()
+                                       .piTableAction(action)
+                                       .build())
+                .build();
+    }
+
+    private FlowRule ueCdrRule(Ip4Address ueAddr) {
+        // FIXME: should we count on downlink, uplink or both?
+        return null;
+    }
+
+    private FlowRule.Builder flowRuleBuilder() {
+        return DefaultFlowRule.builder()
+                .forDevice(deviceId)
+                .withPriority(0)
+                .makePermanent()
+                .fromApp(appId);
+    }
+
+    private void applySessionRules(long sessionId) {
+        if (SESS_ID_TO_FLOWS.containsKey(sessionId)) {
+            batchApply(SESS_ID_TO_FLOWS.get(sessionId));
+        }
+    }
+
+    private void removeSessionRules(long sessionId) {
+        if (SESS_ID_TO_FLOWS.containsKey(sessionId)) {
+            batchRemove(SESS_ID_TO_FLOWS.get(sessionId));
+        }
+    }
+
+    private void batchApply(Collection<FlowRule> rules) {
+        final FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
+        rules.forEach(opsBuilder::add);
+        flowRuleService.apply(opsBuilder.build());
+    }
+
+    private void batchRemove(Collection<FlowRule> rules) {
+        final FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
+        rules.forEach(opsBuilder::remove);
+        flowRuleService.apply(opsBuilder.build());
     }
 }