blob: 1bff64cc103f4c3511d26ed4ed2dc8770858ccfb [file] [log] [blame]
Brian Waters13d96012017-12-08 16:53:31 -06001/*********************************************************************************************************
2* Software License Agreement (BSD License) *
3* Author: Sebastien Decugis <sdecugis@freediameter.net> *
4* *
5* Copyright (c) 2013, WIDE Project and NICT *
6* All rights reserved. *
7* *
8* Redistribution and use of this software in source and binary forms, with or without modification, are *
9* permitted provided that the following conditions are met: *
10* *
11* * Redistributions of source code must retain the above *
12* copyright notice, this list of conditions and the *
13* following disclaimer. *
14* *
15* * Redistributions in binary form must reproduce the above *
16* copyright notice, this list of conditions and the *
17* following disclaimer in the documentation and/or other *
18* materials provided with the distribution. *
19* *
20* * Neither the name of the WIDE Project or NICT nor the *
21* names of its contributors may be used to endorse or *
22* promote products derived from this software without *
23* specific prior written permission of WIDE Project and *
24* NICT. *
25* *
26* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
27* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
28* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
29* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *
30* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS *
31* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
32* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF *
33* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *
34*********************************************************************************************************/
35
36#include "test_netemul.h"
37#include <math.h>
38
39/* This file implements the real processing of the message.
40 The entry point is tne_process_message().
41
42 First, the duplicate filter is applied: with the configured
43 probability, a copy of the message is created. Then, with
44 the tenth probability, a second copy is created, and so on,
45 until the random value tells not to create a copy.
46
47 The original message + all copies are stored in a list, for next step.
48
49 Second step is the latency filter. For each message in the list, a
50 latency value is randomly generated (with a lognormal shape of the
51 random distribution) and stored in the list.
52
53 Finally, when the latency time is over, the message is sent.
54 */
55
56static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
57static pthread_cond_t cnd = PTHREAD_COND_INITIALIZER;
58static pthread_t thr = (pthread_t)NULL;
59
60/* The lists below are all protected by the same mutex mtx */
61static struct fd_list input = FD_LIST_INITIALIZER(input); /* messages received from network */
62static struct fd_list forlat = FD_LIST_INITIALIZER(forlat); /* messages after duplicate filter */
63static struct fd_list waitlist = FD_LIST_INITIALIZER(waitlist); /* messages waiting for sending */
64
65struct process_item {
66 struct fd_list chain; /* link into one of the lists. "o" points to the message. */
67 struct timespec ts; /* when the message must be sent */
68};
69
70/******************************************************************/
71/* helper functions */
72
73/* Process all pi in input list and queue in forlat, duplicating when needed */
74static int do_duplicates()
75{
76 TRACE_ENTRY("");
77
78 while (!FD_IS_LIST_EMPTY(&input)) {
79 struct msg * m;
80 struct process_item * pi = (struct process_item *)(input.next);
81
82 /* Take out this pi from input */
83 fd_list_unlink(&pi->chain);
84
85 /* store it in forlat */
86 fd_list_insert_before(&forlat, &pi->chain);
87
88 /* Duplicate eventually, unless deactivated */
89 if (tne_conf.dupl_proba != 0.0) {
90 DiamId_t src;
91 size_t srclen;
92 /* Pick a random value in [0, 1] */
93 double my_rand = drand48();
94 m = pi->chain.o;
95 CHECK_FCT( fd_msg_source_get(m, &src, &srclen) );
96
97 while (my_rand < (double) tne_conf.dupl_proba) {
98 /* create the duplicate */
99 struct process_item * npi;
100 struct msg * nm;
101 struct msg_hdr * nh;
102 unsigned char * buf;
103 size_t len;
104
105 /* Duplicate the message */
106 CHECK_FCT( fd_msg_bufferize(m, &buf, &len) );
107 CHECK_FCT( fd_msg_parse_buffer(&buf, len, &nm) );
108 CHECK_FCT( fd_msg_source_set(nm, src, srclen) );
109 CHECK_FCT( fd_msg_hdr(nm, &nh) );
110 nh->msg_flags |= CMD_FLAG_RETRANSMIT; /* Add the 'T' flag */
111 TRACE_DEBUG(FULL, "[tne] Duplicated message %p as %p", m, nm);
112
113 /* Duplicate the pi */
114 CHECK_MALLOC( npi = malloc(sizeof(struct process_item)) );
115 memset(npi, 0, sizeof(struct process_item));
116 fd_list_init(&npi->chain, nm);
117 memcpy(&npi->ts, &pi->ts, sizeof(struct timespec));
118
119 /* Enqueue the candidate in forlat */
120 fd_list_insert_before(&forlat, &npi->chain);
121
122 /* loop for another duplicate */
123 if (!my_rand)
124 break; /* otherwise, infinite loop */
125 my_rand *= 10.0;
126 }
127 }
128 }
129
130 /* Done */
131 return 0;
132}
133
134/* Generate a random value with a normal distribution, mean 0, variance 1 */
135/* Using Box-Muller algo from Numerical Recipes in C++, 2nd Ed. */
136static double get_rand_norm()
137{
138 double ru1, ru2; /* two random uniform values in -1..1 */
139 double rsq; /* ru1^2 + ru2^2, to ensure we are in the circle */
140
141 /* Get our appropriate 2 random uniform values */
142 do {
143 ru1 = 2.0 * drand48() - 1.0;
144 ru2 = 2.0 * drand48() - 1.0;
145 rsq = ru1 * ru1 + ru2 * ru2;
146 } while ((rsq >= 1.0) || (rsq == 0.0));
147
148 /* Do the Box-Muller transform -- we don't use the 2nd value generated */
149 return ru1 * sqrt( -2.0 * log(rsq) / rsq );
150}
151
152/* Return the latency to add, in ms. */
153static __inline__ unsigned long get_latency()
154{
155 unsigned long lat = tne_conf.lat_avg;
156
157 if (tne_conf.lat_dev) {
158 /* We randomize the value to add */
159 double rn;
160
161 rn = get_rand_norm(); /* this is normal random value with mean = 0 and variance = 1 */
162 rn = rn * ((double)tne_conf.lat_dev) / 100.0; /* now the variance is lat_dev */
163 rn = exp(rn); /* and now, we have a lognormal random value, with geometric mean = 1 */
164
165 lat = (unsigned long)(rn * (double)lat); /* Apply to our mean latency */
166 }
167
168 return lat;
169}
170
171/* Process all pi in forlat and add a random latency, then requeue in order into waitlist */
172static int do_latency()
173{
174 TRACE_ENTRY("");
175
176 while (!FD_IS_LIST_EMPTY(&forlat)) {
177 struct process_item * pi = (struct process_item *)(forlat.next);
178 struct fd_list * li;
179
180 /* Take out this pi from forlat */
181 fd_list_unlink(&pi->chain);
182
183 /* If there is a latency to add */
184 if (tne_conf.lat_avg) {
185 unsigned long l = get_latency();
186 TRACE_DEBUG(FULL, "[tne] Set %lu ms latency for %p", l, pi->chain.o);
187 pi->ts.tv_sec += l / 1000;
188 l %= 1000;
189 pi->ts.tv_nsec += l * 1000000;
190 if (pi->ts.tv_nsec >= 1000000000) {
191 pi->ts.tv_sec += 1;
192 pi->ts.tv_nsec -= 1000000000;
193 }
194 }
195
196 for (li = waitlist.prev; li != &waitlist; li=li->prev) {
197 struct process_item * p = (struct process_item *)li;
198 if (TS_IS_INFERIOR( &p->ts, &pi->ts ))
199 break; /* we must insert after this one */
200 }
201
202 /* store it */
203 fd_list_insert_after(li, &pi->chain);
204 }
205
206 /* Done */
207 return 0;
208}
209
210/* Send all messages in waitlist that have passed their latency period */
211static int send_all_ready()
212{
213 struct timespec now;
214
215 TRACE_ENTRY("");
216
217 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &now) );
218
219 while (!FD_IS_LIST_EMPTY(&waitlist)) {
220 struct msg * m;
221 struct process_item * pi = (struct process_item *)(waitlist.next);
222
223 if (!TS_IS_INFERIOR( &pi->ts, &now))
224 break; /* We sent already all we could */
225
226 /* Take out this pi and send the message */
227 fd_list_unlink(&pi->chain);
228 m = pi->chain.o;
229 free(pi);
230
231 TRACE_DEBUG(FULL, "[tne] Sending now %p", m);
232 CHECK_FCT( fd_msg_send(&m, NULL, NULL) );
233 }
234
235 return 0;
236
237}
238
239
240/******************************************************************/
241/* the processing thread */
242static void * tne_process_th(void * arg)
243{
244 TRACE_ENTRY("%p", arg);
245
246 /* Name the thread */
247 fd_log_threadname ( "test_netemul/process" );
248
249 CHECK_POSIX_DO( pthread_mutex_lock(&mtx), goto error );
250 pthread_cleanup_push( fd_cleanup_mutex, &mtx );
251
252 /* The loop */
253 while (1) {
254 /* First, test if we are canceled */
255 pthread_testcancel();
256
257 /* Send all messages that are ready (free resources before using new ones) */
258 CHECK_FCT_DO( send_all_ready(), break );
259
260 /* Now process the new messages in input list for duplicate filter */
261 CHECK_FCT_DO( do_duplicates(), break );
262
263 /* Now compute the latency for each new item */
264 CHECK_FCT_DO( do_latency(), break );
265
266 /* Now, wait then loop */
267 if (FD_IS_LIST_EMPTY(&waitlist)) {
268 CHECK_POSIX_DO( pthread_cond_wait(&cnd, &mtx), break );
269 } else {
270 CHECK_POSIX_DO2( pthread_cond_timedwait(&cnd, &mtx, &((struct process_item *)(waitlist.next))->ts),
271 ETIMEDOUT, /* ETIMEDOUT is a normal return value, continue */,
272 /* on other error, */ break );
273 }
274
275 /* loop */
276 }
277
278 pthread_cleanup_pop( 0 );
279 CHECK_POSIX_DO( pthread_mutex_unlock(&mtx), );
280error:
281 TRACE_DEBUG(INFO, "A fatal error occurred in test_netemul/process thread!");
282 ASSERT(0);
283 CHECK_FCT_DO(fd_core_shutdown(), );
284 return NULL;
285}
286
287/******************************************************************/
288/* functions visible from outside this file */
289int tne_process_init()
290{
291 CHECK_POSIX( pthread_create(&thr, NULL, tne_process_th, NULL) );
292
293 #if 0 /* debug */
294 int i;
295 for (i=0; i< 20; i++) {
296 printf("LAT: %lu\n", get_latency());
297 }
298 #endif /* 0 */
299
300 return 0;
301}
302
303
304int tne_process_fini()
305{
306 CHECK_FCT( fd_thr_term(&thr) );
307 return 0;
308}
309
310
311int tne_process_message(struct msg * msg)
312{
313 struct process_item * pi;
314
315 TRACE_ENTRY("%p", msg);
316
317 /* Create a new pi for this message */
318 CHECK_MALLOC( pi = malloc(sizeof(struct process_item)) );
319 memset(pi, 0, sizeof(struct process_item));
320 fd_list_init(&pi->chain, msg);
321 CHECK_SYS(clock_gettime(CLOCK_REALTIME, &pi->ts));
322
323 /* Store it in the input list */
324 CHECK_POSIX( pthread_mutex_lock(&mtx) );
325 fd_list_insert_before(&input, &pi->chain);
326 CHECK_POSIX( pthread_mutex_unlock(&mtx) );
327
328 /* Wake up the process thread so that it processes the message */
329 CHECK_POSIX( pthread_cond_signal(&cnd) );
330
331 /* done */
332 return 0;
333}