blob: 8c29a2f7760f1258de65ee5a54f29c9c0f05de24 [file] [log] [blame]
Ilayda Ozdemir4c5947c2020-05-05 13:14:32 +00001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.opencord.igmpproxy.impl;
17
18import org.onlab.packet.Ip4Address;
19import org.onosproject.net.DeviceId;
20import org.onosproject.net.PortNumber;
21import org.opencord.igmpproxy.IgmpLeadershipService;
22import org.opencord.igmpproxy.statemachine.StateMachine;
23import org.opencord.igmpproxy.statemachine.StateMachineId;
24import org.opencord.igmpproxy.statemachine.StateMachineService;
25import org.opencord.igmpproxy.impl.store.machine.StateMachineStore;
26import org.osgi.service.component.ComponentContext;
27import org.osgi.service.component.annotations.Component;
28import org.osgi.service.component.annotations.Reference;
29import org.osgi.service.component.annotations.ReferenceCardinality;
30import org.osgi.service.component.annotations.Activate;
31import org.osgi.service.component.annotations.Deactivate;
32import org.slf4j.Logger;
33import org.slf4j.LoggerFactory;
34
35import java.util.Collection;
36
37
38/**
39 * State machine for whole IGMP process. The state machine is implemented on
40 * RFC 2236 "6. Host State Diagram".
41 */
42@Component(immediate = true, service = StateMachineService.class)
43public class StateMachineManager implements StateMachineService {
44 // Only for tests purposes
45 public static boolean sendQuery = true;
46
47 private final Logger log = LoggerFactory.getLogger(getClass());
48
49 @Reference(cardinality = ReferenceCardinality.MANDATORY)
50 protected StateMachineStore stateMachineStore;
51
52 @Reference(cardinality = ReferenceCardinality.MANDATORY)
53 protected IgmpLeadershipService igmpLeadershipService;
54
55 @Activate
56 public void activate(ComponentContext context) {
57 log.info("Started.");
58 }
59
60 @Deactivate
61 public void deactivate(ComponentContext context) {
62 log.info("Stopped.");
63 }
64
65 private StateMachine get(DeviceId devId, Ip4Address groupIp) {
66 StateMachineId id = StateMachineId.of(devId, groupIp);
67 return stateMachineStore.getStateMachine(id);
68 }
69
70 public void destroySingle(DeviceId devId, Ip4Address groupIp) {
71 StateMachine machine = get(devId, groupIp);
72 if (machine == null) {
73 log.debug("Machine has already been destroyed. deviceId: {} and groupIp: {} ", devId, groupIp);
74 return;
75 }
76 stateMachineStore.removeStateMachine(machine.getStateMachineId());
77 stateMachineStore.removeCounter(machine.getStateMachineId());
78 }
79
80 @Override
81 public boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP, PortNumber upLinkPort) {
82 StateMachine machineInstance = get(devId, groupIp);
83
84 if (machineInstance == null) {
85 machineInstance = new SingleStateMachine(devId, groupIp, srcIP, upLinkPort);
86 stateMachineStore.putStateMachine(machineInstance);
87 stateMachineStore.increaseAndGetCounter(machineInstance.getStateMachineId());
88 log.debug("Instance of machine created with id: {}", machineInstance.getStateMachineId());
89 boolean shouldSendJoin = true;
90 if (IgmpManager.isIgmpOnPodBasis() &&
91 groupListenedByOtherDevices(devId, groupIp)) {
92 // unset the flag if igmp messages are evaluated on POD basis
93 // and there are already active members of this group
94 // across the entire POD
95 shouldSendJoin = false;
96 }
97 machineInstance.join(shouldSendJoin);
98 stateMachineStore.updateStateMachine(machineInstance);
99 return true;
100 }
101 log.debug("Instance of machine has already been created. deviceId: {} and groupIp: {}",
102 devId, groupIp);
103 stateMachineStore.increaseAndGetCounter(machineInstance.getStateMachineId());
104 return false;
105 }
106
107 @Override
108 public boolean leave(DeviceId devId, Ip4Address groupIp) {
109 StateMachine machine = get(devId, groupIp);
110 if (machine == null) {
111 log.debug("Machine has already been left and destroyed. deviceId: {} and groupIp: {} ", devId, groupIp);
112 return false;
113 }
114
115 long count = stateMachineStore.decreaseAndGetCounter(machine.getStateMachineId());
116 // make sure machine instance still exists.
117 // it may be removed by the preceding thread
118 if (count == 0) {
119 boolean shouldSendLeave = true;
120 if (IgmpManager.isIgmpOnPodBasis() &&
121 groupListenedByOtherDevices(devId, groupIp)) {
122 // unset the flag if igmp messages are evaluated on POD basis
123 // and there are still active members of this group
124 // across the entire POD
125 shouldSendLeave = false;
126 }
127 machine.leave(shouldSendLeave);
128 destroySingle(devId, groupIp);
129 log.debug("This machine left and destroyed. deviceId: {} and groupIp: {}", devId, groupIp);
130 return true;
131 }
132 log.debug("This machine still has member/members. number of member/members: {}, deviceId: {}, groupIp: {} ",
133 count, devId, groupIp);
134 return false;
135 }
136
137 @Override
138 public void specialQuery(DeviceId devId, Ip4Address groupIp, int maxResp) {
139 StateMachine machine = get(devId, groupIp);
140 if (machine == null) {
141 log.debug("Machine is not found. deviceId: {} and groupIp: {} ", devId, groupIp);
142 return;
143 }
144 machine.query(maxResp);
145 stateMachineStore.updateStateMachine(machine);
146 }
147
148 @Override
149 public void generalQuery(DeviceId devId, int maxResp) {
150 for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
151 if (devId.equals(machine.getDeviceId())) {
152 machine.query(maxResp);
153 stateMachineStore.updateStateMachine(machine);
154 }
155 }
156 }
157
158 @Override
159 public void generalQuery(int maxResp) {
160 for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
161 machine.query(maxResp);
162 stateMachineStore.updateStateMachine(machine);
163 }
164 }
165
166 @Override
167 public void timeOut(DeviceId devId, Ip4Address groupIp) {
168 StateMachine machine = get(devId, groupIp);
169 if (machine == null) {
170 log.debug("Machine is not found. deviceId: {} and groupIp: {}", devId, groupIp);
171 return;
172 }
173 machine.timeOut(sendQuery);
174 stateMachineStore.updateStateMachine(machine);
175 }
176
177 @Override
178 public void timeOut1s() {
179 Collection<StateMachine> mapSet = stateMachineStore.getAllStateMachines();
180 for (StateMachine machineInfo : mapSet) {
181 if (machineInfo.getTimeOut() != null) {
182 StateMachineId machineId = machineInfo.getStateMachineId();
183 if (igmpLeadershipService.isLocalLeader(machineId.getDeviceId())) {
184 if (machineInfo.getTimeOut() > 0) {
185 stateMachineStore.decreaseTimeout(machineId);
186 } else {
187 StateMachine machine = stateMachineStore.getStateMachine(machineId);
188 machine.timeOut(sendQuery);
189 machine.destroyTimer();
190 stateMachineStore.updateStateMachine(machine);
191 }
192 }
193 }
194 }
195 }
196
197 @Override
198 public void clearAllMaps() {
199 stateMachineStore.clearAllStateMachines();
200 }
201
202 /**
203 * @param devId id of the device being excluded
204 * @param groupIp group IP address
205 * @return true if this group has at least one listener connected to
206 * any device in the map except for the device specified; false otherwise.
207 */
208 private boolean groupListenedByOtherDevices(DeviceId devId, Ip4Address groupIp) {
209 for (StateMachine machine : stateMachineStore.getAllStateMachines()) {
210 if (machine.getStateMachineId().getDeviceId().equals(devId)) {
211 continue;
212 }
213 if (machine.getStateMachineId().getGroupIp().equals(groupIp)) {
214 //means group is being listened by other peers in the domain
215 return true;
216 }
217 }
218 return false;
219 }
220}