Fixed some bugs

- Update br-int rather than do nothing if it already exists
- Make only the leader performs node bootstrap
- Check mastership on HOST event not flow rule populator
- Install/uninstall flow rules for vSG always from master, or the rules
  stay in PENDING_ADDED state

Change-Id: I4bd5cf6f84bf36f2617288b2d843435819c76ba8
diff --git a/src/main/java/org/onosproject/cordvtn/CordVtn.java b/src/main/java/org/onosproject/cordvtn/CordVtn.java
index a08bd35..e852cf9 100644
--- a/src/main/java/org/onosproject/cordvtn/CordVtn.java
+++ b/src/main/java/org/onosproject/cordvtn/CordVtn.java
@@ -145,6 +145,7 @@
     private static final String DATA_PLANE_IP = "dataPlaneIp";
     private static final String DATA_PLANE_INTF = "dataPlaneIntf";
     private static final String S_TAG = "stag";
+    private static final String VSG_HOST_ID = "vsgHostId";
 
     private static final Ip4Address DEFAULT_DNS = Ip4Address.valueOf("8.8.8.8");
 
@@ -175,7 +176,6 @@
                                                  deviceService,
                                                  driverService,
                                                  groupService,
-                                                 mastershipService,
                                                  DEFAULT_TUNNEL);
 
         arpProxy = new CordVtnArpProxy(appId, packetService, hostService);
@@ -301,30 +301,28 @@
     @Override
     public void updateVirtualSubscriberGateways(HostId vSgHostId, String serviceVlan,
                                                 Map<IpAddress, MacAddress> vSgs) {
-        Host vSgVm = hostService.getHost(vSgHostId);
-
-        if (vSgVm == null || !vSgVm.annotations().value(S_TAG).equals(serviceVlan)) {
+        Host vSgHost = hostService.getHost(vSgHostId);
+        if (vSgHost == null || !vSgHost.annotations().value(S_TAG).equals(serviceVlan)) {
             log.debug("Invalid vSG updates for {}", serviceVlan);
             return;
         }
 
-        log.info("Updates vSGs in {} with {}", vSgVm.id(), vSgs.toString());
+        log.info("Updates vSGs in {} with {}", vSgHost.id(), vSgs.toString());
         vSgs.entrySet().stream()
+                .filter(entry -> hostService.getHostsByMac(entry.getValue()).isEmpty())
                 .forEach(entry -> addVirtualSubscriberGateway(
-                        vSgVm,
+                        vSgHost,
                         entry.getKey(),
                         entry.getValue(),
                         serviceVlan));
 
-        hostService.getConnectedHosts(vSgVm.location()).stream()
-                .filter(host -> !host.mac().equals(vSgVm.mac()))
+        hostService.getConnectedHosts(vSgHost.location()).stream()
+                .filter(host -> !host.mac().equals(vSgHost.mac()))
                 .filter(host -> !vSgs.values().contains(host.mac()))
                 .forEach(host -> {
                     log.info("Removed vSG {}", host.toString());
                     hostProvider.hostVanished(host.id());
                 });
-
-        ruleInstaller.populateSubscriberGatewayRules(vSgVm, vSgs.keySet());
     }
 
     /**
@@ -337,16 +335,12 @@
      */
     private void addVirtualSubscriberGateway(Host vSgHost, IpAddress vSgIp, MacAddress vSgMac,
                                              String serviceVlan) {
-        HostId hostId = HostId.hostId(vSgMac);
-        Host host = hostService.getHost(hostId);
-        if (host != null) {
-            log.trace("vSG with {} already exists", vSgMac.toString());
-            return;
-        }
+        log.info("vSG with IP({}) MAC({}) added", vSgIp.toString(), vSgMac.toString());
 
-        log.info("vSG with IP({}) MAC({}) detected", vSgIp.toString(), vSgMac.toString());
+        HostId hostId = HostId.hostId(vSgMac);
         DefaultAnnotations.Builder annotations = DefaultAnnotations.builder()
-                .set(S_TAG, serviceVlan);
+                .set(S_TAG, serviceVlan)
+                .set(VSG_HOST_ID, vSgHost.id().toString());
 
         HostDescription hostDesc = new DefaultHostDescription(
                 vSgMac,
@@ -529,6 +523,11 @@
      * @param host host
      */
     private void serviceVmAdded(Host host) {
+        String serviceVlan = host.annotations().value(S_TAG);
+        if (serviceVlan != null) {
+            virtualSubscriberGatewayAdded(host, serviceVlan);
+        }
+
         String vNetId = host.annotations().value(SERVICE_ID);
         if (vNetId == null) {
             // ignore this host, it is not the service VM, or it's a vSG
@@ -538,8 +537,7 @@
         OpenstackNetwork vNet = openstackService.network(vNetId);
         if (vNet == null) {
             log.warn("Failed to get OpenStack network {} for VM {}({}).",
-                     vNetId,
-                     host.id(),
+                     vNetId, host.id(),
                      host.annotations().value(OPENSTACK_VM_ID));
             return;
         }
@@ -572,20 +570,6 @@
 
         registerDhcpLease(host, service);
         ruleInstaller.populateBasicConnectionRules(host, getTunnelIp(host), vNet);
-
-        String serviceVlan = host.annotations().value(S_TAG);
-        if (serviceVlan != null) {
-            log.debug("vSG VM detected {}", host.id());
-            Map<IpAddress, MacAddress> vSgs = getSubscriberGateways(host);
-            vSgs.entrySet().stream()
-                    .forEach(entry -> addVirtualSubscriberGateway(
-                            host,
-                            entry.getKey(),
-                            entry.getValue(),
-                            serviceVlan));
-
-            ruleInstaller.populateSubscriberGatewayRules(host, vSgs.keySet());
-        }
     }
 
     /**
@@ -594,21 +578,21 @@
      * @param host host
      */
     private void serviceVmRemoved(Host host) {
+        String serviceVlan = host.annotations().value(S_TAG);
+        if (serviceVlan != null) {
+            virtualSubscriberGatewayRemoved(host);
+        }
+
         String vNetId = host.annotations().value(SERVICE_ID);
         if (vNetId == null) {
             // ignore it, it's not the service VM or it's a vSG
-            String serviceVlan = host.annotations().value(S_TAG);
-            if (serviceVlan != null) {
-                log.info("vSG {} removed", host.id());
-            }
             return;
         }
 
         OpenstackNetwork vNet = openstackService.network(vNetId);
         if (vNet == null) {
             log.warn("Failed to get OpenStack network {} for VM {}({}).",
-                     vNetId,
-                     host.id(),
+                     vNetId, host.id(),
                      host.annotations().value(OPENSTACK_VM_ID));
             return;
         }
@@ -642,6 +626,62 @@
         }
     }
 
+
+    /**
+     * Handles virtual subscriber gateway VM or container.
+     *
+     * @param host new host with stag, it can be vsg VM or vsg
+     * @param serviceVlan service vlan
+     */
+    private void virtualSubscriberGatewayAdded(Host host, String serviceVlan) {
+        Map<IpAddress, MacAddress> vSgs;
+        Host vSgHost;
+
+        String vSgHostId = host.annotations().value(VSG_HOST_ID);
+        if (vSgHostId == null) {
+            log.debug("vSG VM detected {}", host.id());
+
+            vSgHost = host;
+            vSgs = getSubscriberGateways(vSgHost);
+            vSgs.entrySet().stream().forEach(entry -> addVirtualSubscriberGateway(
+                    vSgHost,
+                    entry.getKey(),
+                    entry.getValue(),
+                    serviceVlan));
+        } else {
+            vSgHost = hostService.getHost(HostId.hostId(vSgHostId));
+            if (vSgHost == null) {
+                return;
+            }
+
+            log.debug("vSG detected {}", host.id());
+            vSgs = getSubscriberGateways(vSgHost);
+        }
+
+        ruleInstaller.populateSubscriberGatewayRules(vSgHost, vSgs.keySet());
+    }
+
+    /**
+     * Handles virtual subscriber gateway removed.
+     *
+     * @param vSg vsg host to remove
+     */
+    private void virtualSubscriberGatewayRemoved(Host vSg) {
+        String vSgHostId = vSg.annotations().value(VSG_HOST_ID);
+        if (vSgHostId == null) {
+            return;
+        }
+
+        Host vSgHost = hostService.getHost(HostId.hostId(vSgHostId));
+        if (vSgHost == null) {
+            return;
+        }
+
+        log.info("vSG removed {}", vSg.id());
+        Map<IpAddress, MacAddress> vSgs = getSubscriberGateways(vSgHost);
+        ruleInstaller.populateSubscriberGatewayRules(vSgHost, vSgs.keySet());
+    }
+
     /**
      * Sets service network gateway MAC address and sends out gratuitous ARP to all
      * VMs to update the gateway MAC address.
@@ -709,10 +749,14 @@
 
             switch (event.type()) {
                 case HOST_ADDED:
-                    eventExecutor.submit(() -> serviceVmAdded(host));
+                    if (mastershipService.isLocalMaster(host.location().deviceId())) {
+                        eventExecutor.submit(() -> serviceVmAdded(host));
+                    }
                     break;
                 case HOST_REMOVED:
-                    eventExecutor.submit(() -> serviceVmRemoved(host));
+                    if (mastershipService.isLocalMaster(host.location().deviceId())) {
+                        eventExecutor.submit(() -> serviceVmRemoved(host));
+                    }
                     break;
                 default:
                     break;
diff --git a/src/main/java/org/onosproject/cordvtn/CordVtnNodeManager.java b/src/main/java/org/onosproject/cordvtn/CordVtnNodeManager.java
index d4aceeb..bcd76bc 100644
--- a/src/main/java/org/onosproject/cordvtn/CordVtnNodeManager.java
+++ b/src/main/java/org/onosproject/cordvtn/CordVtnNodeManager.java
@@ -27,9 +27,10 @@
 import org.onlab.util.ItemNotFoundException;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
-import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.Device;
@@ -70,6 +71,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
@@ -149,7 +151,7 @@
     protected FlowRuleService flowRuleService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected MastershipService mastershipService;
+    protected LeadershipService leadershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected GroupService groupService;
@@ -169,6 +171,7 @@
     private ConsistentMap<String, CordVtnNode> nodeStore;
     private CordVtnRuleInstaller ruleInstaller;
     private ApplicationId appId;
+    private NodeId localNodeId;
 
     private enum NodeState implements CordVtnNodeState {
 
@@ -217,6 +220,9 @@
     @Activate
     protected void active() {
         appId = coreService.getAppId(CordVtnService.CORDVTN_APP_ID);
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipService.runForLeadership(appId.name());
+
         nodeStore = storageService.<String, CordVtnNode>consistentMapBuilder()
                 .withSerializer(Serializer.using(NODE_SERIALIZER.build()))
                 .withName("cordvtn-nodestore")
@@ -227,7 +233,6 @@
                                                  deviceService,
                                                  driverService,
                                                  groupService,
-                                                 mastershipService,
                                                  DEFAULT_TUNNEL);
 
         deviceService.addListener(deviceListener);
@@ -242,6 +247,7 @@
 
         eventExecutor.shutdown();
         nodeStore.clear();
+        leadershipService.withdraw(appId.name());
     }
 
     /**
@@ -285,6 +291,13 @@
             return;
         }
 
+        NodeId leaderNodeId = leadershipService.getLeader(appId.name());
+        log.debug("Node init requested, local: {} leader: {}", localNodeId, leaderNodeId);
+        if (!Objects.equals(localNodeId, leaderNodeId)) {
+            // only the leader performs node init
+            return;
+        }
+
         NodeState state = getNodeState(node);
         log.debug("Init node: {} state: {}", node.hostname(), state.toString());
         state.process(this, node);
@@ -839,6 +852,12 @@
         @Override
         public void event(DeviceEvent event) {
 
+            NodeId leaderNodeId = leadershipService.getLeader(appId.name());
+            if (!Objects.equals(localNodeId, leaderNodeId)) {
+                // only the leader processes events
+                return;
+            }
+
             Device device = event.subject();
             ConnectionHandler<Device> handler =
                     (device.type().equals(SWITCH) ? bridgeHandler : ovsdbHandler);
diff --git a/src/main/java/org/onosproject/cordvtn/CordVtnRuleInstaller.java b/src/main/java/org/onosproject/cordvtn/CordVtnRuleInstaller.java
index 4c67587..f3877e0 100644
--- a/src/main/java/org/onosproject/cordvtn/CordVtnRuleInstaller.java
+++ b/src/main/java/org/onosproject/cordvtn/CordVtnRuleInstaller.java
@@ -30,7 +30,6 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.DefaultGroupId;
 import org.onosproject.core.GroupId;
-import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Host;
@@ -120,13 +119,13 @@
     private static final String PORT_NAME = "portName";
     private static final String DATA_PLANE_INTF = "dataPlaneIntf";
     private static final String S_TAG = "stag";
+    private static final String OVS_HW_VERSION = "Open vSwitch";
 
     private final ApplicationId appId;
     private final FlowRuleService flowRuleService;
     private final DeviceService deviceService;
     private final DriverService driverService;
     private final GroupService groupService;
-    private final MastershipService mastershipService;
     private final String tunnelType;
 
     /**
@@ -137,7 +136,6 @@
      * @param deviceService device service
      * @param driverService driver service
      * @param groupService group service
-     * @param mastershipService mastership service
      * @param tunnelType tunnel type
      */
     public CordVtnRuleInstaller(ApplicationId appId,
@@ -145,14 +143,12 @@
                                 DeviceService deviceService,
                                 DriverService driverService,
                                 GroupService groupService,
-                                MastershipService mastershipService,
                                 String tunnelType) {
         this.appId = appId;
         this.flowRuleService = flowRuleService;
         this.deviceService = deviceService;
         this.driverService = driverService;
         this.groupService = groupService;
-        this.mastershipService = mastershipService;
         this.tunnelType = checkNotNull(tunnelType);
     }
 
@@ -187,10 +183,6 @@
         checkNotNull(vNet);
 
         DeviceId deviceId = host.location().deviceId();
-        if (!mastershipService.isLocalMaster(deviceId)) {
-            return;
-        }
-
         PortNumber inPort = host.location().port();
         MacAddress dstMac = host.mac();
         IpAddress hostIp = host.ipAddresses().stream().findFirst().get();
@@ -225,10 +217,6 @@
         PortNumber port = host.location().port();
         IpAddress ip = host.ipAddresses().stream().findFirst().orElse(null);
 
-        if (!mastershipService.isLocalMaster(deviceId)) {
-            return;
-        }
-
         for (FlowRule flowRule : flowRuleService.getFlowRulesById(appId)) {
             if (flowRule.deviceId().equals(deviceId)) {
                 PortNumber inPort = getInPort(flowRule);
@@ -284,6 +272,10 @@
         Map<DeviceId, Set<PortNumber>> inPorts = Maps.newHashMap();
 
         for (Device device : deviceService.getAvailableDevices(SWITCH)) {
+            if (!device.hwVersion().equals(OVS_HW_VERSION)) {
+                continue;
+            }
+
             GroupId groupId = createServiceGroup(device.id(), pService);
             outGroups.put(device.id(), groupId);
 
@@ -320,12 +312,16 @@
         Map<DeviceId, GroupId> outGroups = Maps.newHashMap();
         GroupKey groupKey = new DefaultGroupKey(pService.id().id().getBytes());
 
-        deviceService.getAvailableDevices(SWITCH).forEach(device -> {
+        for (Device device : deviceService.getAvailableDevices(SWITCH)) {
+            if (!device.hwVersion().equals(OVS_HW_VERSION)) {
+                continue;
+            }
+
             Group group = groupService.getGroup(device.id(), groupKey);
             if (group != null) {
                 outGroups.put(device.id(), group.id());
             }
-        });
+        }
 
         for (FlowRule flowRule : flowRuleService.getFlowRulesById(appId)) {
             IpPrefix dstIp = getDstIpFromSelector(flowRule);
@@ -368,11 +364,11 @@
         GroupKey groupKey = getGroupKey(service.id());
 
         for (Device device : deviceService.getAvailableDevices(SWITCH)) {
-            DeviceId deviceId = device.id();
-            if (!mastershipService.isLocalMaster(deviceId)) {
+            if (!device.hwVersion().equals(OVS_HW_VERSION)) {
                 continue;
             }
 
+            DeviceId deviceId = device.id();
             Group group = groupService.getGroup(deviceId, groupKey);
             if (group == null) {
                 log.trace("No group exists for service {} in {}, do nothing.", service.id(), deviceId);
@@ -421,10 +417,6 @@
         DeviceId deviceId = host.location().deviceId();
         IpAddress hostIp = host.ipAddresses().stream().findFirst().get();
 
-        if (!mastershipService.isLocalMaster(deviceId)) {
-            return;
-        }
-
         TrafficSelector selector = DefaultTrafficSelector.builder()
                 .matchEthType(Ethernet.TYPE_ARP)
                 .matchArpTpa(mService.serviceIp().getIp4Address())
@@ -520,10 +512,6 @@
      */
     public void removeManagementNetworkRules(Host host, CordService mService) {
         checkNotNull(mService);
-
-        if (!mastershipService.isLocalMaster(host.location().deviceId())) {
-            return;
-        }
         // TODO remove management network specific rules
     }
 
@@ -980,6 +968,10 @@
                 .build();
 
         for (Device device : deviceService.getAvailableDevices(SWITCH)) {
+            if (!device.hwVersion().equals(OVS_HW_VERSION)) {
+                continue;
+            }
+
             FlowRule flowRuleDirect = DefaultFlowRule.builder()
                     .fromApp(appId)
                     .withSelector(selector)
@@ -1011,6 +1003,10 @@
                 .build();
 
         for (Device device : deviceService.getAvailableDevices(SWITCH)) {
+            if (!device.hwVersion().equals(OVS_HW_VERSION)) {
+                continue;
+            }
+
             FlowRule flowRuleDirect = DefaultFlowRule.builder()
                     .fromApp(appId)
                     .withSelector(selector)
@@ -1138,6 +1134,10 @@
         processFlowRule(true, flowRule);
 
         for (Device device : deviceService.getAvailableDevices(SWITCH)) {
+            if (!device.hwVersion().equals(OVS_HW_VERSION)) {
+                continue;
+            }
+
             if (device.id().equals(deviceId)) {
                 continue;
             }