Brian Waters | 13d9601 | 2017-12-08 16:53:31 -0600 | [diff] [blame] | 1 | /********************************************************************************************************* |
| 2 | * Software License Agreement (BSD License) * |
| 3 | * Author: Sebastien Decugis <sdecugis@freediameter.net> * |
| 4 | * * |
| 5 | * Copyright (c) 2013, WIDE Project and NICT * |
| 6 | * All rights reserved. * |
| 7 | * * |
| 8 | * Redistribution and use of this software in source and binary forms, with or without modification, are * |
| 9 | * permitted provided that the following conditions are met: * |
| 10 | * * |
| 11 | * * Redistributions of source code must retain the above * |
| 12 | * copyright notice, this list of conditions and the * |
| 13 | * following disclaimer. * |
| 14 | * * |
| 15 | * * Redistributions in binary form must reproduce the above * |
| 16 | * copyright notice, this list of conditions and the * |
| 17 | * following disclaimer in the documentation and/or other * |
| 18 | * materials provided with the distribution. * |
| 19 | * * |
| 20 | * * Neither the name of the WIDE Project or NICT nor the * |
| 21 | * names of its contributors may be used to endorse or * |
| 22 | * promote products derived from this software without * |
| 23 | * specific prior written permission of WIDE Project and * |
| 24 | * NICT. * |
| 25 | * * |
| 26 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED * |
| 27 | * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A * |
| 28 | * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR * |
| 29 | * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * |
| 30 | * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * |
| 31 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR * |
| 32 | * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * |
| 33 | * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * |
| 34 | *********************************************************************************************************/ |
| 35 | |
| 36 | #include "test_netemul.h" |
| 37 | #include <math.h> |
| 38 | |
| 39 | /* This file implements the real processing of the message. |
| 40 | The entry point is tne_process_message(). |
| 41 | |
| 42 | First, the duplicate filter is applied: with the configured |
| 43 | probability, a copy of the message is created. Then, with |
| 44 | the tenth probability, a second copy is created, and so on, |
| 45 | until the random value tells not to create a copy. |
| 46 | |
| 47 | The original message + all copies are stored in a list, for next step. |
| 48 | |
| 49 | Second step is the latency filter. For each message in the list, a |
| 50 | latency value is randomly generated (with a lognormal shape of the |
| 51 | random distribution) and stored in the list. |
| 52 | |
| 53 | Finally, when the latency time is over, the message is sent. |
| 54 | */ |
| 55 | |
| 56 | static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; |
| 57 | static pthread_cond_t cnd = PTHREAD_COND_INITIALIZER; |
| 58 | static pthread_t thr = (pthread_t)NULL; |
| 59 | |
| 60 | /* The lists below are all protected by the same mutex mtx */ |
| 61 | static struct fd_list input = FD_LIST_INITIALIZER(input); /* messages received from network */ |
| 62 | static struct fd_list forlat = FD_LIST_INITIALIZER(forlat); /* messages after duplicate filter */ |
| 63 | static struct fd_list waitlist = FD_LIST_INITIALIZER(waitlist); /* messages waiting for sending */ |
| 64 | |
| 65 | struct process_item { |
| 66 | struct fd_list chain; /* link into one of the lists. "o" points to the message. */ |
| 67 | struct timespec ts; /* when the message must be sent */ |
| 68 | }; |
| 69 | |
| 70 | /******************************************************************/ |
| 71 | /* helper functions */ |
| 72 | |
| 73 | /* Process all pi in input list and queue in forlat, duplicating when needed */ |
| 74 | static int do_duplicates() |
| 75 | { |
| 76 | TRACE_ENTRY(""); |
| 77 | |
| 78 | while (!FD_IS_LIST_EMPTY(&input)) { |
| 79 | struct msg * m; |
| 80 | struct process_item * pi = (struct process_item *)(input.next); |
| 81 | |
| 82 | /* Take out this pi from input */ |
| 83 | fd_list_unlink(&pi->chain); |
| 84 | |
| 85 | /* store it in forlat */ |
| 86 | fd_list_insert_before(&forlat, &pi->chain); |
| 87 | |
| 88 | /* Duplicate eventually, unless deactivated */ |
| 89 | if (tne_conf.dupl_proba != 0.0) { |
| 90 | DiamId_t src; |
| 91 | size_t srclen; |
| 92 | /* Pick a random value in [0, 1] */ |
| 93 | double my_rand = drand48(); |
| 94 | m = pi->chain.o; |
| 95 | CHECK_FCT( fd_msg_source_get(m, &src, &srclen) ); |
| 96 | |
| 97 | while (my_rand < (double) tne_conf.dupl_proba) { |
| 98 | /* create the duplicate */ |
| 99 | struct process_item * npi; |
| 100 | struct msg * nm; |
| 101 | struct msg_hdr * nh; |
| 102 | unsigned char * buf; |
| 103 | size_t len; |
| 104 | |
| 105 | /* Duplicate the message */ |
| 106 | CHECK_FCT( fd_msg_bufferize(m, &buf, &len) ); |
| 107 | CHECK_FCT( fd_msg_parse_buffer(&buf, len, &nm) ); |
| 108 | CHECK_FCT( fd_msg_source_set(nm, src, srclen) ); |
| 109 | CHECK_FCT( fd_msg_hdr(nm, &nh) ); |
| 110 | nh->msg_flags |= CMD_FLAG_RETRANSMIT; /* Add the 'T' flag */ |
| 111 | TRACE_DEBUG(FULL, "[tne] Duplicated message %p as %p", m, nm); |
| 112 | |
| 113 | /* Duplicate the pi */ |
| 114 | CHECK_MALLOC( npi = malloc(sizeof(struct process_item)) ); |
| 115 | memset(npi, 0, sizeof(struct process_item)); |
| 116 | fd_list_init(&npi->chain, nm); |
| 117 | memcpy(&npi->ts, &pi->ts, sizeof(struct timespec)); |
| 118 | |
| 119 | /* Enqueue the candidate in forlat */ |
| 120 | fd_list_insert_before(&forlat, &npi->chain); |
| 121 | |
| 122 | /* loop for another duplicate */ |
| 123 | if (!my_rand) |
| 124 | break; /* otherwise, infinite loop */ |
| 125 | my_rand *= 10.0; |
| 126 | } |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | /* Done */ |
| 131 | return 0; |
| 132 | } |
| 133 | |
| 134 | /* Generate a random value with a normal distribution, mean 0, variance 1 */ |
| 135 | /* Using Box-Muller algo from Numerical Recipes in C++, 2nd Ed. */ |
| 136 | static double get_rand_norm() |
| 137 | { |
| 138 | double ru1, ru2; /* two random uniform values in -1..1 */ |
| 139 | double rsq; /* ru1^2 + ru2^2, to ensure we are in the circle */ |
| 140 | |
| 141 | /* Get our appropriate 2 random uniform values */ |
| 142 | do { |
| 143 | ru1 = 2.0 * drand48() - 1.0; |
| 144 | ru2 = 2.0 * drand48() - 1.0; |
| 145 | rsq = ru1 * ru1 + ru2 * ru2; |
| 146 | } while ((rsq >= 1.0) || (rsq == 0.0)); |
| 147 | |
| 148 | /* Do the Box-Muller transform -- we don't use the 2nd value generated */ |
| 149 | return ru1 * sqrt( -2.0 * log(rsq) / rsq ); |
| 150 | } |
| 151 | |
| 152 | /* Return the latency to add, in ms. */ |
| 153 | static __inline__ unsigned long get_latency() |
| 154 | { |
| 155 | unsigned long lat = tne_conf.lat_avg; |
| 156 | |
| 157 | if (tne_conf.lat_dev) { |
| 158 | /* We randomize the value to add */ |
| 159 | double rn; |
| 160 | |
| 161 | rn = get_rand_norm(); /* this is normal random value with mean = 0 and variance = 1 */ |
| 162 | rn = rn * ((double)tne_conf.lat_dev) / 100.0; /* now the variance is lat_dev */ |
| 163 | rn = exp(rn); /* and now, we have a lognormal random value, with geometric mean = 1 */ |
| 164 | |
| 165 | lat = (unsigned long)(rn * (double)lat); /* Apply to our mean latency */ |
| 166 | } |
| 167 | |
| 168 | return lat; |
| 169 | } |
| 170 | |
| 171 | /* Process all pi in forlat and add a random latency, then requeue in order into waitlist */ |
| 172 | static int do_latency() |
| 173 | { |
| 174 | TRACE_ENTRY(""); |
| 175 | |
| 176 | while (!FD_IS_LIST_EMPTY(&forlat)) { |
| 177 | struct process_item * pi = (struct process_item *)(forlat.next); |
| 178 | struct fd_list * li; |
| 179 | |
| 180 | /* Take out this pi from forlat */ |
| 181 | fd_list_unlink(&pi->chain); |
| 182 | |
| 183 | /* If there is a latency to add */ |
| 184 | if (tne_conf.lat_avg) { |
| 185 | unsigned long l = get_latency(); |
| 186 | TRACE_DEBUG(FULL, "[tne] Set %lu ms latency for %p", l, pi->chain.o); |
| 187 | pi->ts.tv_sec += l / 1000; |
| 188 | l %= 1000; |
| 189 | pi->ts.tv_nsec += l * 1000000; |
| 190 | if (pi->ts.tv_nsec >= 1000000000) { |
| 191 | pi->ts.tv_sec += 1; |
| 192 | pi->ts.tv_nsec -= 1000000000; |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | for (li = waitlist.prev; li != &waitlist; li=li->prev) { |
| 197 | struct process_item * p = (struct process_item *)li; |
| 198 | if (TS_IS_INFERIOR( &p->ts, &pi->ts )) |
| 199 | break; /* we must insert after this one */ |
| 200 | } |
| 201 | |
| 202 | /* store it */ |
| 203 | fd_list_insert_after(li, &pi->chain); |
| 204 | } |
| 205 | |
| 206 | /* Done */ |
| 207 | return 0; |
| 208 | } |
| 209 | |
| 210 | /* Send all messages in waitlist that have passed their latency period */ |
| 211 | static int send_all_ready() |
| 212 | { |
| 213 | struct timespec now; |
| 214 | |
| 215 | TRACE_ENTRY(""); |
| 216 | |
| 217 | CHECK_SYS( clock_gettime(CLOCK_REALTIME, &now) ); |
| 218 | |
| 219 | while (!FD_IS_LIST_EMPTY(&waitlist)) { |
| 220 | struct msg * m; |
| 221 | struct process_item * pi = (struct process_item *)(waitlist.next); |
| 222 | |
| 223 | if (!TS_IS_INFERIOR( &pi->ts, &now)) |
| 224 | break; /* We sent already all we could */ |
| 225 | |
| 226 | /* Take out this pi and send the message */ |
| 227 | fd_list_unlink(&pi->chain); |
| 228 | m = pi->chain.o; |
| 229 | free(pi); |
| 230 | |
| 231 | TRACE_DEBUG(FULL, "[tne] Sending now %p", m); |
| 232 | CHECK_FCT( fd_msg_send(&m, NULL, NULL) ); |
| 233 | } |
| 234 | |
| 235 | return 0; |
| 236 | |
| 237 | } |
| 238 | |
| 239 | |
| 240 | /******************************************************************/ |
| 241 | /* the processing thread */ |
| 242 | static void * tne_process_th(void * arg) |
| 243 | { |
| 244 | TRACE_ENTRY("%p", arg); |
| 245 | |
| 246 | /* Name the thread */ |
| 247 | fd_log_threadname ( "test_netemul/process" ); |
| 248 | |
| 249 | CHECK_POSIX_DO( pthread_mutex_lock(&mtx), goto error ); |
| 250 | pthread_cleanup_push( fd_cleanup_mutex, &mtx ); |
| 251 | |
| 252 | /* The loop */ |
| 253 | while (1) { |
| 254 | /* First, test if we are canceled */ |
| 255 | pthread_testcancel(); |
| 256 | |
| 257 | /* Send all messages that are ready (free resources before using new ones) */ |
| 258 | CHECK_FCT_DO( send_all_ready(), break ); |
| 259 | |
| 260 | /* Now process the new messages in input list for duplicate filter */ |
| 261 | CHECK_FCT_DO( do_duplicates(), break ); |
| 262 | |
| 263 | /* Now compute the latency for each new item */ |
| 264 | CHECK_FCT_DO( do_latency(), break ); |
| 265 | |
| 266 | /* Now, wait then loop */ |
| 267 | if (FD_IS_LIST_EMPTY(&waitlist)) { |
| 268 | CHECK_POSIX_DO( pthread_cond_wait(&cnd, &mtx), break ); |
| 269 | } else { |
| 270 | CHECK_POSIX_DO2( pthread_cond_timedwait(&cnd, &mtx, &((struct process_item *)(waitlist.next))->ts), |
| 271 | ETIMEDOUT, /* ETIMEDOUT is a normal return value, continue */, |
| 272 | /* on other error, */ break ); |
| 273 | } |
| 274 | |
| 275 | /* loop */ |
| 276 | } |
| 277 | |
| 278 | pthread_cleanup_pop( 0 ); |
| 279 | CHECK_POSIX_DO( pthread_mutex_unlock(&mtx), ); |
| 280 | error: |
| 281 | TRACE_DEBUG(INFO, "A fatal error occurred in test_netemul/process thread!"); |
| 282 | ASSERT(0); |
| 283 | CHECK_FCT_DO(fd_core_shutdown(), ); |
| 284 | return NULL; |
| 285 | } |
| 286 | |
| 287 | /******************************************************************/ |
| 288 | /* functions visible from outside this file */ |
| 289 | int tne_process_init() |
| 290 | { |
| 291 | CHECK_POSIX( pthread_create(&thr, NULL, tne_process_th, NULL) ); |
| 292 | |
| 293 | #if 0 /* debug */ |
| 294 | int i; |
| 295 | for (i=0; i< 20; i++) { |
| 296 | printf("LAT: %lu\n", get_latency()); |
| 297 | } |
| 298 | #endif /* 0 */ |
| 299 | |
| 300 | return 0; |
| 301 | } |
| 302 | |
| 303 | |
| 304 | int tne_process_fini() |
| 305 | { |
| 306 | CHECK_FCT( fd_thr_term(&thr) ); |
| 307 | return 0; |
| 308 | } |
| 309 | |
| 310 | |
| 311 | int tne_process_message(struct msg * msg) |
| 312 | { |
| 313 | struct process_item * pi; |
| 314 | |
| 315 | TRACE_ENTRY("%p", msg); |
| 316 | |
| 317 | /* Create a new pi for this message */ |
| 318 | CHECK_MALLOC( pi = malloc(sizeof(struct process_item)) ); |
| 319 | memset(pi, 0, sizeof(struct process_item)); |
| 320 | fd_list_init(&pi->chain, msg); |
| 321 | CHECK_SYS(clock_gettime(CLOCK_REALTIME, &pi->ts)); |
| 322 | |
| 323 | /* Store it in the input list */ |
| 324 | CHECK_POSIX( pthread_mutex_lock(&mtx) ); |
| 325 | fd_list_insert_before(&input, &pi->chain); |
| 326 | CHECK_POSIX( pthread_mutex_unlock(&mtx) ); |
| 327 | |
| 328 | /* Wake up the process thread so that it processes the message */ |
| 329 | CHECK_POSIX( pthread_cond_signal(&cnd) ); |
| 330 | |
| 331 | /* done */ |
| 332 | return 0; |
| 333 | } |