/*
 * Copyright 2017-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.opencord.igmpproxy;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.onlab.packet.EthType;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IGMP;
import org.onlab.packet.IGMPGroup;
import org.onlab.packet.IGMPMembership;
import org.onlab.packet.IGMPQuery;
import org.onlab.packet.IPv4;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.IpAddress;
import org.onlab.packet.VlanId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.config.basics.McastConfig;
import org.onosproject.net.config.basics.SubjectFactories;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.criteria.Criteria;
import org.onosproject.net.flowobjective.DefaultFilteringObjective;
import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.mcast.api.McastRoute;
import org.onosproject.mcast.api.MulticastRouteService;
import org.onosproject.net.packet.InboundPacket;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
import org.opencord.cordconfig.access.AccessDeviceConfig;
import org.opencord.cordconfig.access.AccessDeviceData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;

/**
 * Igmp process application, use proxy mode, support first join/ last leave , fast leave
 * period query and keep alive, packet out igmp message to uplink port features.
 */
@Component(immediate = true)
public class IgmpManager {

    private static final Class<AccessDeviceConfig> CONFIG_CLASS =
            AccessDeviceConfig.class;
    private static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS =
            IgmpproxyConfig.class;
    private static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS =
            IgmpproxySsmTranslateConfig.class;
    private static final Class<McastConfig> MCAST_CONFIG_CLASS =
            McastConfig.class;

    public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
    private static ApplicationId appId;
    private static Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
    private static int unSolicitedTimeout = 3; // unit is 1 sec
    private static int keepAliveCount = 3;
    private static int lastQueryInterval = 2;  //unit is 1 sec
    private static int lastQueryCount = 2;
    private static boolean fastLeave = true;
    private static boolean withRAUplink = true;
    private static boolean withRADownlink = false;
    private static boolean periodicQuery = true;
    private static short mvlan = 4000;
    private static byte igmpCos = 7;
    public static boolean connectPointMode = true;
    public static ConnectPoint connectPoint = null;
    private static ConnectPoint sourceDeviceAndPort = null;
    private static boolean enableIgmpProvisioning = false;
    private static boolean igmpOnPodBasis = false;

    private static final Integer MAX_PRIORITY = 10000;
    private static final String INSTALLED = "installed";
    private static final String REMOVED = "removed";
    private static final String INSTALLATION = "installation";
    private static final String REMOVAL = "removal";

    private static boolean pimSSmInterworking = false;
    private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
    private final ScheduledExecutorService scheduledExecutorService =
            Executors.newScheduledThreadPool(1);
    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected CoreService coreService;
    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected PacketService packetService;
    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected MastershipService mastershipService;
    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected FlowRuleService flowRuleService;
    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected DeviceService deviceService;
    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected FlowObjectiveService flowObjectiveService;
    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected NetworkConfigRegistry networkConfig;
    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected MulticastRouteService multicastService;
    private IgmpPacketProcessor processor = new IgmpPacketProcessor();
    private Logger log = LoggerFactory.getLogger(getClass());
    private ApplicationId coreAppId;
    private Map<Ip4Address, Ip4Address> ssmTranslateTable = new ConcurrentHashMap<>();

    private InternalNetworkConfigListener configListener =
            new InternalNetworkConfigListener();
    private DeviceListener deviceListener = new InternalDeviceListener();
    private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory = null;
    private ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
            new ConfigFactory<ApplicationId, IgmpproxyConfig>(
                    SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
                @Override
                public IgmpproxyConfig createConfig() {
                    return new IgmpproxyConfig();
                }
            };
    private ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
            new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
                    SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
                @Override
                public IgmpproxySsmTranslateConfig createConfig() {
                    return new IgmpproxySsmTranslateConfig();
                }
            };

    private int maxResp = 10; //unit is 1 sec
    private int keepAliveInterval = 120; //unit is 1 sec

    private ExecutorService eventExecutor;

    public static int getUnsolicitedTimeout() {
        return unSolicitedTimeout;
    }

    @Activate
    protected void activate() {
        appId = coreService.registerApplication("org.opencord.igmpproxy");
        coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
        packetService.addProcessor(processor, PacketProcessor.director(4));
        IgmpSender.init(packetService, mastershipService);

        if (networkConfig.getConfigFactory(CONFIG_CLASS) == null) {
            configFactory =
                    new ConfigFactory<DeviceId, AccessDeviceConfig>(
                            SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
                        @Override
                        public AccessDeviceConfig createConfig() {
                            return new AccessDeviceConfig();
                        }
                    };
            networkConfig.registerConfigFactory(configFactory);
        }
        networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
        networkConfig.registerConfigFactory(igmpproxyConfigFactory);
        networkConfig.addListener(configListener);

        configListener.reconfigureNetwork(networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS));
        configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));

        networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
                subject -> {
                    AccessDeviceConfig config = networkConfig.getConfig(subject,
                            AccessDeviceConfig.class);
                    if (config != null) {
                        AccessDeviceData data = config.getAccessDevice();
                        oltData.put(data.deviceId(), data);
                    }
                }
        );

        oltData.keySet().forEach(d -> provisionDefaultFlows(d));
        if (connectPointMode) {
            provisionConnectPointFlows();
        } else {
            provisionUplinkFlows();
        }

        McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
        if (config != null) {
            mvlan = config.egressVlan().toShort();
            IgmpSender.getInstance().setMvlan(mvlan);
        }
        deviceService.addListener(deviceListener);
        scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 1000, 1000, TimeUnit.MILLISECONDS);
        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
                                                                        "events-igmp-%d", log));

        log.info("Started");
    }

    @Deactivate
    protected void deactivate() {
        scheduledExecutorService.shutdown();
        eventExecutor.shutdown();

        // de-register and null our handler
        networkConfig.removeListener(configListener);
        if (configFactory != null) {
            networkConfig.unregisterConfigFactory(configFactory);
        }
        networkConfig.unregisterConfigFactory(igmpproxyConfigFactory);
        networkConfig.unregisterConfigFactory(igmpproxySsmConfigFactory);
        deviceService.removeListener(deviceListener);
        packetService.removeProcessor(processor);
        flowRuleService.removeFlowRulesById(appId);

        log.info("Stopped");
    }

    protected Ip4Address getDeviceIp(DeviceId ofDeviceId) {
        try {
            String[] mgmtAddress = deviceService.getDevice(ofDeviceId)
                    .annotations().value(AnnotationKeys.MANAGEMENT_ADDRESS).split(":");
            return Ip4Address.valueOf(mgmtAddress[0]);
        } catch (Exception ex) {
            log.info("No valid Ipaddress for " + ofDeviceId.toString());
            return null;
        }
    }

    private void processIgmpQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {

        DeviceId deviceId = cp.deviceId();
        Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
        maxResp = calculateMaxResp(maxResp);
        if (gAddr != null && !gAddr.isZero()) {
            StateMachine.specialQuery(deviceId, gAddr, maxResp);
        } else {
            StateMachine.generalQuery(deviceId, maxResp);
        }
    }

    private void processIgmpConnectPointQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {

        DeviceId deviceId = cp.deviceId();
        Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
        maxResp = calculateMaxResp(maxResp);
        //The query is received on the ConnectPoint
        // send query accordingly to the registered OLT devices.
        if (gAddr != null && !gAddr.isZero()) {
            for (DeviceId devId : oltData.keySet()) {
                StateMachine.specialQuery(devId, gAddr, maxResp);
            }
        } else {
            //Don't know which group is targeted by the query
            //So query all the members(in all the OLTs) and proxy their reports
            StateMachine.generalQuery(maxResp);
        }
    }


    private int calculateMaxResp(int maxResp) {
        if (maxResp >= 128) {
            int mant = maxResp & 0xf;
            int exp = (maxResp >> 4) & 0x7;
            maxResp = (mant | 0x10) << (exp + 3);
        }

        return (maxResp + 5) / 10;
    }

    private Ip4Address ssmTranslateRoute(IpAddress group) {
        return ssmTranslateTable.get(group);
    }

    private void processIgmpReport(IGMPMembership igmpGroup, VlanId vlan, ConnectPoint cp, byte igmpType) {
        DeviceId deviceId = cp.deviceId();
        PortNumber portNumber = cp.port();

        Ip4Address groupIp = igmpGroup.getGaddr().getIp4Address();
        if (!groupIp.isMulticast()) {
            log.info(groupIp.toString() + " is not a valid group address");
            return;
        }
        Ip4Address srcIp = getDeviceIp(deviceId);

        byte recordType = igmpGroup.getRecordType();
        boolean join = false;

        ArrayList<Ip4Address> sourceList = new ArrayList<>();

        if (igmpGroup.getSources().size() > 0) {
            igmpGroup.getSources().forEach(source -> sourceList.add(source.getIp4Address()));
            if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
                    recordType == IGMPMembership.MODE_IS_EXCLUDE ||
                    recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
                join = false;
            } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
                    recordType == IGMPMembership.MODE_IS_INCLUDE ||
                    recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
                join = true;
            }
        } else {
            IpAddress src = null;
            if (pimSSmInterworking) {
                src = ssmTranslateRoute(groupIp);
                if (src == null) {
                    log.info("no ssm translate for group " + groupIp.toString());
                    return;
                }
            } else {
                src = IpAddress.valueOf(DEFAULT_PIMSSM_HOST);
            }
            sourceList.add(src.getIp4Address());
            if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
                    recordType == IGMPMembership.MODE_IS_EXCLUDE ||
                    recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
                join = true;
            } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
                    recordType == IGMPMembership.MODE_IS_INCLUDE ||
                    recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
                join = false;
            }
        }
        String groupMemberKey = GroupMember.getkey(groupIp, deviceId, portNumber);
        GroupMember groupMember = groupMemberMap.get(groupMemberKey);

        if (join) {
            if (groupMember == null) {
                if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
                } else {
                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
                }

                Optional<ConnectPoint> sourceConfigured = getSource();
                if (!sourceConfigured.isPresent()) {
                    log.warn("Unable to process IGMP Join from {} since no source " +
                                     "configuration is found.", deviceId);
                    return;
                }
                HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());

                StateMachine.join(deviceId, groupIp, srcIp);
                groupMemberMap.put(groupMemberKey, groupMember);
                groupMember.updateList(recordType, sourceList);
                groupMember.getSourceList().forEach(source -> {
                    McastRoute route = new McastRoute(source, groupIp, McastRoute.Type.IGMP);
                    //add route
                    multicastService.add(route);
                    //add source to the route
                    multicastService.addSources(route, Sets.newHashSet(sourceConnectPoints));
                    //add sink to the route
                    multicastService.addSinks(route, Sets.newHashSet(cp));
                });

            }
            groupMember.resetAllTimers();
            groupMember.updateList(recordType, sourceList);
            groupMember.setLeave(false);
        } else {
            if (groupMember == null) {
                log.info("receive leave but no instance, group " + groupIp.toString() +
                        " device:" + deviceId.toString() + " port:" + portNumber.toString());
                return;
            } else {
                groupMember.setLeave(true);
                if (fastLeave) {
                    leaveAction(groupMember);
                } else {
                    sendQuery(groupMember);
                }
            }
        }
    }

    private void leaveAction(GroupMember groupMember) {
        ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
        StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
        groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
                new McastRoute(source, groupMember.getGroupIp(),
                               McastRoute.Type.IGMP), Sets.newHashSet(cp)));
        groupMemberMap.remove(groupMember.getId());
    }

    private void sendQuery(GroupMember groupMember) {
        Ethernet ethpkt;
        Ip4Address srcIp = getDeviceIp(groupMember.getDeviceId());
        if (groupMember.getv2()) {
            ethpkt = IgmpSender.getInstance().buildIgmpV2Query(groupMember.getGroupIp(), srcIp);
        } else {
            ethpkt = IgmpSender.getInstance().buildIgmpV3Query(groupMember.getGroupIp(), srcIp);
        }
        IgmpSender.getInstance().sendIgmpPacket(ethpkt, groupMember.getDeviceId(), groupMember.getPortNumber());
    }

    /**
     * @return connect point of the source if configured; and empty Optional otherwise.
     */
    public static Optional<ConnectPoint> getSource() {
        return sourceDeviceAndPort == null ? Optional.empty() :
                Optional.of(sourceDeviceAndPort);
    }

    /**
     * Packet processor responsible for forwarding packets along their paths.
     */
    private class IgmpPacketProcessor implements PacketProcessor {
        @Override
        public void process(PacketContext context) {
            eventExecutor.execute(() -> {
                try {
                    InboundPacket pkt = context.inPacket();
                    Ethernet ethPkt = pkt.parsed();
                    if (ethPkt == null) {
                        return;
                    }

                    if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
                        return;
                    }

                    IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();

                    if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
                        return;
                    }

                    short vlan = ethPkt.getVlanID();
                    DeviceId deviceId = pkt.receivedFrom().deviceId();

                    if (oltData.get(deviceId) == null &&
                            !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
                        log.error("Device not registered in netcfg :" + deviceId.toString());
                        return;
                    }

                    IGMP igmp = (IGMP) ipv4Pkt.getPayload();
                    switch (igmp.getIgmpType()) {
                        case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
                            //Discard Query from OLT’s non-uplink port’s
                            if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
                                if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
                                    log.info("IGMP Picked up query from connectPoint");
                                    //OK to process packet
                                    processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
                                                                 pkt.receivedFrom(),
                                                                 0xff & igmp.getMaxRespField());
                                    break;
                                } else {
                                    //Not OK to process packet
                                    log.warn("IGMP Picked up query from non-uplink port");
                                    return;
                                }
                            }

                            processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
                                             0xff & igmp.getMaxRespField());
                            break;
                        case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
                            log.debug("IGMP version 1  message types are not currently supported.");
                            break;
                        case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
                        case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
                        case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
                            //Discard join/leave from OLT’s uplink port’s
                            if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId)) ||
                                    isConnectPoint(deviceId, pkt.receivedFrom().port())) {
                                log.info("IGMP Picked up join/leave from uplink/connectPoint port");
                                return;
                            }

                            Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
                            while (itr.hasNext()) {
                                IGMPGroup group = itr.next();
                                if (group instanceof IGMPMembership) {
                                    processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
                                                      pkt.receivedFrom(), igmp.getIgmpType());
                                } else if (group instanceof IGMPQuery) {
                                    IGMPMembership mgroup;
                                    mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
                                    mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
                                                                 IGMPMembership.MODE_IS_EXCLUDE :
                                                                 IGMPMembership.MODE_IS_INCLUDE);
                                    processIgmpReport(mgroup, VlanId.vlanId(vlan),
                                                      pkt.receivedFrom(), igmp.getIgmpType());
                                }
                            }
                            break;

                        default:
                            log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
                            break;
                    }

                } catch (Exception ex) {
                    log.error("igmp process error : {} ", ex);
                    ex.printStackTrace();
                }
            });
        }
    }

    private class IgmpProxyTimerTask extends TimerTask {
        public void run() {
            try {
                IgmpTimer.timeOut1s();
                queryMembers();
            } catch (Exception ex) {
                log.warn("Igmp timer task error : {}", ex.getMessage());
            }
        }

        private void queryMembers() {
            GroupMember groupMember;
            Set groupMemberSet = groupMemberMap.entrySet();
            Iterator itr = groupMemberSet.iterator();
            while (itr.hasNext()) {
                Map.Entry entry = (Map.Entry) itr.next();
                groupMember = (GroupMember) entry.getValue();
                DeviceId did = groupMember.getDeviceId();
                if (mastershipService.isLocalMaster(did)) {
                    if (groupMember.isLeave()) {
                        lastQuery(groupMember);
                    } else if (periodicQuery) {
                        periodicQuery(groupMember);
                    }
                }
            }
        }

        private void lastQuery(GroupMember groupMember) {
            if (groupMember.getLastQueryInterval() < lastQueryInterval) {
                groupMember.lastQueryInterval(true); // count times
            } else if (groupMember.getLastQueryCount() < lastQueryCount - 1) {
                sendQuery(groupMember);
                groupMember.lastQueryInterval(false); // reset count number
                groupMember.lastQueryCount(true); //count times
            } else if (groupMember.getLastQueryCount() == lastQueryCount - 1) {
                leaveAction(groupMember);
            }
        }

        private void periodicQuery(GroupMember groupMember) {
            if (groupMember.getKeepAliveQueryInterval() < keepAliveInterval) {
                groupMember.keepAliveInterval(true);
            } else if (groupMember.getKeepAliveQueryCount() < keepAliveCount) {
                sendQuery(groupMember);
                groupMember.keepAliveInterval(false);
                groupMember.keepAliveQueryCount(true);
            } else if (groupMember.getKeepAliveQueryCount() == keepAliveCount) {
                leaveAction(groupMember);
            }
        }

    }

    public static PortNumber getDeviceUplink(DeviceId devId) {
        if (oltData.get(devId) != null) {
            return oltData.get(devId).uplink();
        } else {
            return null;
        }
    }

    public static boolean isIgmpOnPodBasis() {
        return igmpOnPodBasis;
    }

    private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
        if (!enableIgmpProvisioning) {
            log.debug("IGMP trap rules won't be installed since enableIgmpProvisioning flag is not set");
            return;
        }
        //TODO migrate to packet requests when packet service uses filtering objectives
        DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();

        builder = remove ? builder.deny() : builder.permit();

        FilteringObjective igmp = builder
                .withKey(Criteria.matchInPort(port))
                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
                .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
                .fromApp(appId)
                .withPriority(MAX_PRIORITY)
                .add(new ObjectiveContext() {
                    @Override
                    public void onSuccess(Objective objective) {
                        log.info("Igmp filter for {} on {} {}.",
                                 devId, port, (remove) ? REMOVED : INSTALLED);
                    }

                    @Override
                    public void onError(Objective objective, ObjectiveError error) {
                        log.info("Igmp filter {} for device {} on port {} failed because of {}",
                                 (remove) ? INSTALLATION : REMOVAL, devId, port,
                                 error);
                    }
                });

        flowObjectiveService.filter(devId, igmp);

    }

    private boolean isConnectPoint(DeviceId device, PortNumber port) {
        if (connectPoint != null) {
            return (connectPointMode && connectPoint.deviceId().equals(device)
                    && connectPoint.port().equals(port));
        } else {
            log.info("connectPoint not configured for device {}", device);
            return false;
        }
    }

    private boolean isUplink(DeviceId device, PortNumber port) {
        return ((!connectPointMode) && oltData.containsKey(device)
                && oltData.get(device).uplink().equals(port));
    }

    private class InternalDeviceListener implements DeviceListener {
        @Override
        public void event(DeviceEvent event) {
            DeviceId devId = event.subject().id();
            Port p = event.port();
            if (oltData.get(devId) == null &&
                    !(p != null && isConnectPoint(devId, p.number()))) {
                return;
            }
            PortNumber port;

            switch (event.type()) {

                case DEVICE_ADDED:
                case DEVICE_UPDATED:
                case DEVICE_REMOVED:
                case DEVICE_SUSPENDED:
                case DEVICE_AVAILABILITY_CHANGED:
                case PORT_STATS_UPDATED:
                    break;
                case PORT_ADDED:
                    port = p.number();
                    if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
                        processFilterObjective(devId, port, false);
                    } else if (isUplink(devId, port)) {
                        provisionUplinkFlows();
                    } else if (isConnectPoint(devId, port)) {
                        provisionConnectPointFlows();
                    }
                    break;
                case PORT_UPDATED:
                    port = p.number();
                    if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
                        if (event.port().isEnabled()) {
                            processFilterObjective(devId, port, false);
                        } else {
                            processFilterObjective(devId, port, true);
                        }
                    } else if (isUplink(devId, port)) {
                        if (event.port().isEnabled()) {
                            provisionUplinkFlows(devId);
                        } else {
                            processFilterObjective(devId, port, true);
                        }
                    } else if (isConnectPoint(devId, port)) {
                        if (event.port().isEnabled()) {
                            provisionConnectPointFlows();
                        } else {
                            unprovisionConnectPointFlows();
                        }
                    }
                    break;
                case PORT_REMOVED:
                    port = p.number();
                    processFilterObjective(devId, port, true);
                    break;
                default:
                    log.info("Unknown device event {}", event.type());
                    break;
            }
        }

        @Override
        public boolean isRelevant(DeviceEvent event) {
            return true;
        }
    }

    private class InternalNetworkConfigListener implements NetworkConfigListener {

        private void reconfigureNetwork(IgmpproxyConfig cfg) {
            IgmpproxyConfig newCfg = cfg == null ? new IgmpproxyConfig() : cfg;

            unSolicitedTimeout = newCfg.unsolicitedTimeOut();
            maxResp = newCfg.maxResp();
            keepAliveInterval = newCfg.keepAliveInterval();
            keepAliveCount = newCfg.keepAliveCount();
            lastQueryInterval = newCfg.lastQueryInterval();
            lastQueryCount = newCfg.lastQueryCount();
            withRAUplink = newCfg.withRAUplink();
            withRADownlink = newCfg.withRADownlink();
            igmpCos = newCfg.igmpCos();
            periodicQuery = newCfg.periodicQuery();
            fastLeave = newCfg.fastLeave();
            pimSSmInterworking = newCfg.pimSsmInterworking();
            enableIgmpProvisioning = newCfg.enableIgmpProvisioning();
            igmpOnPodBasis = newCfg.igmpOnPodBasis();

            if (connectPointMode != newCfg.connectPointMode() ||
                    connectPoint != newCfg.connectPoint()) {
                connectPointMode = newCfg.connectPointMode();
                connectPoint = newCfg.connectPoint();
                if (connectPointMode) {
                    unprovisionUplinkFlows();
                    provisionConnectPointFlows();
                } else {
                    unprovisionConnectPointFlows();
                    provisionUplinkFlows();
                }
            }
            if (connectPoint != null) {
                log.info("connect point : {}", connectPoint);
            }
            log.info("mode: {}", connectPointMode);

            getSourceConnectPoint(newCfg);

            IgmpSender.getInstance().setIgmpCos(igmpCos);
            IgmpSender.getInstance().setMaxResp(maxResp);
            IgmpSender.getInstance().setMvlan(mvlan);
            IgmpSender.getInstance().setWithRADownlink(withRADownlink);
            IgmpSender.getInstance().setWithRAUplink(withRAUplink);
        }

        void getSourceConnectPoint(IgmpproxyConfig cfg) {
            sourceDeviceAndPort = cfg.getSourceDeviceAndPort();
            if (sourceDeviceAndPort != null) {
                log.debug("source parameter configured to {}", sourceDeviceAndPort);
            }
        }

        public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
            if (cfg == null) {
                return;
            }
            Collection<McastRoute> translations = cfg.getSsmTranslations();
            for (McastRoute route : translations) {
                ssmTranslateTable.put(route.group().getIp4Address(), route.source().get().getIp4Address());
            }
        }

        @Override
        public void event(NetworkConfigEvent event) {
            switch (event.type()) {
                case CONFIG_ADDED:
                case CONFIG_UPDATED:
                    if (event.configClass().equals(CONFIG_CLASS)) {
                        AccessDeviceConfig config =
                                networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
                        if (config != null) {
                            oltData.put(config.getAccessDevice().deviceId(), config.getAccessDevice());
                            provisionDefaultFlows((DeviceId) event.subject());
                            provisionUplinkFlows((DeviceId) event.subject());
                        }
                    }

                    if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
                        IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
                        if (config != null) {
                            log.info("igmpproxy config received. {}", config);
                            reconfigureNetwork(config);
                        }
                    }

                    if (event.configClass().equals(IGMPPROXY_SSM_CONFIG_CLASS)) {
                        IgmpproxySsmTranslateConfig config = networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS);
                        if (config != null) {
                            reconfigureSsmTable(config);
                        }
                    }

                    if (event.configClass().equals(MCAST_CONFIG_CLASS)) {
                        McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
                        if (config != null && mvlan != config.egressVlan().toShort()) {
                            mvlan = config.egressVlan().toShort();
                            IgmpSender.getInstance().setMvlan(mvlan);
                            groupMemberMap.values().forEach(m -> leaveAction(m));
                        }
                    }

                    log.info("Reconfigured");
                    break;
                case CONFIG_REGISTERED:
                case CONFIG_UNREGISTERED:
                    break;
                case CONFIG_REMOVED:
                    if (event.configClass().equals(CONFIG_CLASS)) {
                        oltData.remove(event.subject());
                    }

                default:
                    break;
            }
        }
    }

    private void provisionDefaultFlows(DeviceId deviceId) {
        List<Port> ports = deviceService.getPorts(deviceId);
        ports.stream()
                .filter(p -> (!oltData.get(p.element().id()).uplink().equals(p.number()) && p.isEnabled()))
                .forEach(p -> processFilterObjective((DeviceId) p.element().id(), p.number(), false));
    }

    private void provisionUplinkFlows(DeviceId deviceId) {
        if (connectPointMode) {
            return;
        }

        processFilterObjective(deviceId, oltData.get(deviceId).uplink(), false);
    }

    private void provisionUplinkFlows() {
        if (connectPointMode) {
            return;
        }

        oltData.keySet().forEach(deviceId -> provisionUplinkFlows(deviceId));
    }
    private void unprovisionUplinkFlows() {
        oltData.keySet().forEach(deviceId ->
                processFilterObjective(deviceId, oltData.get(deviceId).uplink(), true));
    }

    private void provisionConnectPointFlows() {
        if ((!connectPointMode) || connectPoint == null) {
            return;
        }

        processFilterObjective(connectPoint.deviceId(), connectPoint.port(), false);
    }
    private void unprovisionConnectPointFlows() {
        if (connectPoint == null) {
            return;
        }
        processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
    }
}
