blob: cf5b7fc67e9430e8369d29a4744bb375dc1b527b [file] [log] [blame]
Brian Waters13d96012017-12-08 16:53:31 -06001/*********************************************************************************************************
2* Software License Agreement (BSD License) *
3* Author: Sebastien Decugis <sdecugis@freediameter.net> *
4* *
5* Copyright (c) 2013, WIDE Project and NICT *
6* All rights reserved. *
7* *
8* Redistribution and use of this software in source and binary forms, with or without modification, are *
9* permitted provided that the following conditions are met: *
10* *
11* * Redistributions of source code must retain the above *
12* copyright notice, this list of conditions and the *
13* following disclaimer. *
14* *
15* * Redistributions in binary form must reproduce the above *
16* copyright notice, this list of conditions and the *
17* following disclaimer in the documentation and/or other *
18* materials provided with the distribution. *
19* *
20* * Neither the name of the WIDE Project or NICT nor the *
21* names of its contributors may be used to endorse or *
22* promote products derived from this software without *
23* specific prior written permission of WIDE Project and *
24* NICT. *
25* *
26* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
27* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
28* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
29* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *
30* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS *
31* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
32* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF *
33* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *
34*********************************************************************************************************/
35
36/* FIFO queues module.
37 *
38 * The threads that call these functions must be in the cancellation state PTHREAD_CANCEL_ENABLE and type PTHREAD_CANCEL_DEFERRED.
39 * This is the default state and type on thread creation.
40 *
41 * In order to destroy properly a queue, the application must:
42 * -> shutdown any process that can add into the queue first.
43 * -> pthread_cancel any thread that could be waiting on the queue.
44 * -> consume any element that is in the queue, using fd_qu_tryget_int.
45 * -> then destroy the queue using fd_mq_del.
46 */
47
48#include "fdproto-internal.h"
49
50/* Definition of a FIFO queue object */
51struct fifo {
52 int eyec; /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */
53
54 pthread_mutex_t mtx; /* Mutex protecting this queue */
55 pthread_cond_t cond_pull; /* condition variable for pulling threads */
56 pthread_cond_t cond_push; /* condition variable for pushing threads */
57
58 struct fd_list list; /* sentinel for the list of elements */
59 int count; /* number of objects in the list */
60 int thrs; /* number of threads waiting for a new element (when count is 0) */
61
62 int max; /* maximum number of items to accept if not 0 */
63 int thrs_push; /* number of threads waitnig to push an item */
64
65 uint16_t high; /* High level threshold (see libfreeDiameter.h for details) */
66 uint16_t low; /* Low level threshhold */
67 void *data; /* Opaque pointer for threshold callbacks */
68 void (*h_cb)(struct fifo *, void **); /* The callbacks */
69 void (*l_cb)(struct fifo *, void **);
70 int highest;/* The highest count value for which h_cb has been called */
71 int highest_ever; /* The max count value this queue has reached (for tweaking) */
72
73 long long total_items; /* Cumulated number of items that went through this fifo (excluding current count), always increasing. */
74 struct timespec total_time; /* Cumulated time all items spent in this queue, including blocking time (always growing, use deltas for monitoring) */
75 struct timespec blocking_time; /* Cumulated time threads trying to post new items were blocked (queue full). */
76 struct timespec last_time; /* For the last element retrieved from the queue, how long it take between posting (including blocking) and poping */
77
78};
79
80struct fifo_item {
81 struct fd_list item;
82 struct timespec posted_on;
83};
84
85/* The eye catcher value */
86#define FIFO_EYEC 0xe7ec1130
87
88/* Macro to check a pointer */
89#define CHECK_FIFO( _queue ) (( (_queue) != NULL) && ( (_queue)->eyec == FIFO_EYEC) )
90
91
92/* Create a new queue, with max number of items -- use 0 for no max */
93int fd_fifo_new ( struct fifo ** queue, int max )
94{
95 struct fifo * new;
96
97 TRACE_ENTRY( "%p", queue );
98
99 CHECK_PARAMS( queue );
100
101 /* Create a new object */
102 CHECK_MALLOC( new = malloc (sizeof (struct fifo) ) );
103
104 /* Initialize the content */
105 memset(new, 0, sizeof(struct fifo));
106
107 new->eyec = FIFO_EYEC;
108 CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) );
109 CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) );
110 CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) );
111 new->max = max;
112
113 fd_list_init(&new->list, NULL);
114
115 /* We're done */
116 *queue = new;
117 return 0;
118}
119
120/* Dump the content of a queue */
121DECLARE_FD_DUMP_PROTOTYPE(fd_fifo_dump, char * name, struct fifo * queue, fd_fifo_dump_item_cb dump_item)
122{
123 FD_DUMP_HANDLE_OFFSET();
124
125 if (name) {
126 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL);
127 } else {
128 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "{fifo}(@%p): ", queue), return NULL);
129 }
130
131 if (!CHECK_FIFO( queue )) {
132 return fd_dump_extend(FD_DUMP_STD_PARAMS, "INVALID/NULL");
133 }
134
135 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), /* continue */ );
136 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p",
137 queue->count, queue->highest_ever, queue->max,
138 queue->thrs, queue->thrs_push,
139 queue->total_items,(long)queue->total_time.tv_sec,(long)(queue->total_time.tv_nsec/1000),(long)queue->blocking_time.tv_sec,(long)(queue->blocking_time.tv_nsec/1000),(long)queue->last_time.tv_sec,(long)(queue->last_time.tv_nsec/1000),
140 queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data),
141 goto error);
142
143 if (dump_item) {
144 struct fd_list * li;
145 int i = 0;
146 for (li = queue->list.next; li != &queue->list; li = li->next) {
147 struct fifo_item * fi = (struct fifo_item *)li;
148 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ",
149 i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)),
150 goto error);
151 CHECK_MALLOC_DO( (*dump_item)(FD_DUMP_STD_PARAMS, fi->item.o), goto error);
152 }
153 }
154 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), /* continue */ );
155
156 return *buf;
157error:
158 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), /* continue */ );
159 return NULL;
160}
161
162/* Delete a queue. It must be empty. */
163int fd_fifo_del ( struct fifo ** queue )
164{
165 struct fifo * q;
166 int loops = 0;
167
168 TRACE_ENTRY( "%p", queue );
169
170 CHECK_PARAMS( queue && CHECK_FIFO( *queue ) );
171
172 q = *queue;
173
174 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) );
175
176 if ((q->count != 0) || (q->data != NULL)) {
177 TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data);
178 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* no fallback */ );
179 return EINVAL;
180 }
181
182 /* Ok, now invalidate the queue */
183 q->eyec = 0xdead;
184
185 /* Have all waiting threads return an error */
186 while (q->thrs) {
187 CHECK_POSIX( pthread_mutex_unlock( &q->mtx ));
188 CHECK_POSIX( pthread_cond_signal(&q->cond_pull) );
189 usleep(1000);
190
191 CHECK_POSIX( pthread_mutex_lock( &q->mtx ) );
192 ASSERT( ++loops < 20 ); /* detect infinite loops */
193 }
194
195 /* sanity check */
196 ASSERT(FD_IS_LIST_EMPTY(&q->list));
197
198 /* And destroy it */
199 CHECK_POSIX( pthread_mutex_unlock( &q->mtx ) );
200
201 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_pull ), );
202
203 CHECK_POSIX_DO( pthread_cond_destroy( &q->cond_push ), );
204
205 CHECK_POSIX_DO( pthread_mutex_destroy( &q->mtx ), );
206
207 free(q);
208 *queue = NULL;
209
210 return 0;
211}
212
213/* Move the content of old into new, and update loc_update atomically. We leave the old queue empty but valid */
214int fd_fifo_move ( struct fifo * old, struct fifo * new, struct fifo ** loc_update )
215{
216 int loops = 0;
217
218 TRACE_ENTRY("%p %p %p", old, new, loc_update);
219 CHECK_PARAMS( CHECK_FIFO( old ) && CHECK_FIFO( new ));
220
221 CHECK_PARAMS( ! old->data );
222 if (new->high) {
223 TODO("Implement support for thresholds in fd_fifo_move...");
224 }
225
226 /* Update loc_update */
227 if (loc_update)
228 *loc_update = new;
229
230 /* Lock the queues */
231 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) );
232
233 CHECK_PARAMS_DO( (! old->thrs_push), {
234 pthread_mutex_unlock( &old->mtx );
235 return EINVAL;
236 } );
237
238 CHECK_POSIX( pthread_mutex_lock( &new->mtx ) );
239
240 /* Any waiting thread on the old queue returns an error */
241 old->eyec = 0xdead;
242 while (old->thrs) {
243 CHECK_POSIX( pthread_mutex_unlock( &old->mtx ));
244 CHECK_POSIX( pthread_cond_signal( &old->cond_pull ) );
245 usleep(1000);
246
247 CHECK_POSIX( pthread_mutex_lock( &old->mtx ) );
248 ASSERT( loops < 20 ); /* detect infinite loops */
249 }
250
251 /* Move all data from old to new */
252 fd_list_move_end( &new->list, &old->list );
253 if (old->count && (!new->count)) {
254 CHECK_POSIX( pthread_cond_signal(&new->cond_pull) );
255 }
256 new->count += old->count;
257
258 /* Reset old */
259 old->count = 0;
260 old->eyec = FIFO_EYEC;
261
262 /* Merge the stats in the new queue */
263 new->total_items += old->total_items;
264 old->total_items = 0;
265
266 new->total_time.tv_nsec += old->total_time.tv_nsec;
267 new->total_time.tv_sec += old->total_time.tv_sec + (new->total_time.tv_nsec / 1000000000);
268 new->total_time.tv_nsec %= 1000000000;
269 old->total_time.tv_nsec = 0;
270 old->total_time.tv_sec = 0;
271
272 new->blocking_time.tv_nsec += old->blocking_time.tv_nsec;
273 new->blocking_time.tv_sec += old->blocking_time.tv_sec + (new->blocking_time.tv_nsec / 1000000000);
274 new->blocking_time.tv_nsec %= 1000000000;
275 old->blocking_time.tv_nsec = 0;
276 old->blocking_time.tv_sec = 0;
277
278 /* Unlock, we're done */
279 CHECK_POSIX( pthread_mutex_unlock( &new->mtx ) );
280 CHECK_POSIX( pthread_mutex_unlock( &old->mtx ) );
281
282 return 0;
283}
284
285/* Get the information on the queue */
286int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count,
287 struct timespec * total, struct timespec * blocking, struct timespec * last)
288{
289 TRACE_ENTRY( "%p %p %p %p %p %p %p %p", queue, current_count, limit_count, highest_count, total_count, total, blocking, last);
290
291 /* Check the parameters */
292 CHECK_PARAMS( CHECK_FIFO( queue ) );
293
294 /* lock the queue */
295 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
296
297 if (current_count)
298 *current_count = queue->count;
299
300 if (limit_count)
301 *limit_count = queue->max;
302
303 if (highest_count)
304 *highest_count = queue->highest_ever;
305
306 if (total_count)
307 *total_count = queue->total_items;
308
309 if (total)
310 memcpy(total, &queue->total_time, sizeof(struct timespec));
311
312 if (blocking)
313 memcpy(blocking, &queue->blocking_time, sizeof(struct timespec));
314
315 if (last)
316 memcpy(last, &queue->last_time, sizeof(struct timespec));
317
318 /* Unlock */
319 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
320
321 /* Done */
322 return 0;
323}
324
325
326/* alternate version with no error checking */
327int fd_fifo_length ( struct fifo * queue )
328{
329 if ( !CHECK_FIFO( queue ) )
330 return 0;
331
332 return queue->count; /* Let's hope it's read atomically, since we are not locking... */
333}
334
335/* Set the thresholds of the queue */
336int fd_fifo_setthrhd ( struct fifo * queue, void * data, uint16_t high, void (*h_cb)(struct fifo *, void **), uint16_t low, void (*l_cb)(struct fifo *, void **) )
337{
338 TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb );
339
340 /* Check the parameters */
341 CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) );
342
343 /* lock the queue */
344 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
345
346 /* Save the values */
347 queue->high = high;
348 queue->low = low;
349 queue->data = data;
350 queue->h_cb = h_cb;
351 queue->l_cb = l_cb;
352
353 /* Unlock */
354 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
355
356 /* Done */
357 return 0;
358}
359
360
361/* This handler is called when a thread is blocked on a queue, and cancelled */
362static void fifo_cleanup_push(void * queue)
363{
364 struct fifo * q = (struct fifo *)queue;
365 TRACE_ENTRY( "%p", queue );
366
367 /* The thread has been cancelled, therefore it does not wait on the queue anymore */
368 q->thrs_push--;
369
370 /* Now unlock the queue, and we're done */
371 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ );
372
373 /* End of cleanup handler */
374 return;
375}
376
377
378/* Post a new item in the queue */
379int fd_fifo_post_internal ( struct fifo * queue, void ** item, int skip_max )
380{
381 struct fifo_item * new;
382 int call_cb = 0;
383 struct timespec posted_on, queued_on;
384
385 /* Get the timing of this call */
386 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &posted_on) );
387
388 /* lock the queue */
389 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
390
391 if ((!skip_max) && (queue->max)) {
392 while (queue->count >= queue->max) {
393 int ret = 0;
394
395 /* We have to wait for an item to be pulled */
396 queue->thrs_push++ ;
397 pthread_cleanup_push( fifo_cleanup_push, queue);
398 ret = pthread_cond_wait( &queue->cond_push, &queue->mtx );
399 pthread_cleanup_pop(0);
400 queue->thrs_push-- ;
401
402 ASSERT( ret == 0 );
403 }
404 }
405
406 /* Create a new list item */
407 CHECK_MALLOC_DO( new = malloc (sizeof (struct fifo_item)) , {
408 pthread_mutex_unlock( &queue->mtx );
409 return ENOMEM;
410 } );
411
412 fd_list_init(&new->item, *item);
413 *item = NULL;
414
415 /* Add the new item at the end */
416 fd_list_insert_before( &queue->list, &new->item);
417 queue->count++;
418 if (queue->highest_ever < queue->count)
419 queue->highest_ever = queue->count;
420 if (queue->high && ((queue->count % queue->high) == 0)) {
421 call_cb = 1;
422 queue->highest = queue->count;
423 }
424
425 /* store timing */
426 memcpy(&new->posted_on, &posted_on, sizeof(struct timespec));
427
428 /* update queue timing info "blocking time" */
429 {
430 long long blocked_ns;
431 CHECK_SYS( clock_gettime(CLOCK_REALTIME, &queued_on) );
432 blocked_ns = (queued_on.tv_sec - posted_on.tv_sec) * 1000000000;
433 blocked_ns += (queued_on.tv_nsec - posted_on.tv_nsec);
434 blocked_ns += queue->blocking_time.tv_nsec;
435 queue->blocking_time.tv_sec += blocked_ns / 1000000000;
436 queue->blocking_time.tv_nsec = blocked_ns % 1000000000;
437 }
438
439 /* Signal if threads are asleep */
440 if (queue->thrs > 0) {
441 CHECK_POSIX( pthread_cond_signal(&queue->cond_pull) );
442 }
443 if (queue->thrs_push > 0) {
444 /* cascade */
445 CHECK_POSIX( pthread_cond_signal(&queue->cond_push) );
446 }
447
448 /* Unlock */
449 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
450
451 /* Call high-watermark cb as needed */
452 if (call_cb && queue->h_cb)
453 (*queue->h_cb)(queue, &queue->data);
454
455 /* Done */
456 return 0;
457}
458
459/* Post a new item in the queue */
460int fd_fifo_post_int ( struct fifo * queue, void ** item )
461{
462 TRACE_ENTRY( "%p %p", queue, item );
463
464 /* Check the parameters */
465 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
466
467 return fd_fifo_post_internal ( queue,item, 0 );
468
469}
470
471/* Post a new item in the queue, not blocking */
472int fd_fifo_post_noblock ( struct fifo * queue, void ** item )
473{
474 TRACE_ENTRY( "%p %p", queue, item );
475
476 /* Check the parameters */
477 CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
478
479 return fd_fifo_post_internal ( queue,item, 1 );
480
481}
482
483/* Pop the first item from the queue */
484static void * mq_pop(struct fifo * queue)
485{
486 void * ret = NULL;
487 struct fifo_item * fi;
488 struct timespec now;
489
490 ASSERT( ! FD_IS_LIST_EMPTY(&queue->list) );
491
492 fi = (struct fifo_item *)(queue->list.next);
493 ret = fi->item.o;
494 fd_list_unlink(&fi->item);
495 queue->count--;
496 queue->total_items++;
497
498 /* Update the timings */
499 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), goto skip_timing );
500 {
501 long long elapsed = (now.tv_sec - fi->posted_on.tv_sec) * 1000000000;
502 elapsed += now.tv_nsec - fi->posted_on.tv_nsec;
503
504 queue->last_time.tv_sec = elapsed / 1000000000;
505 queue->last_time.tv_nsec = elapsed % 1000000000;
506
507 elapsed += queue->total_time.tv_nsec;
508 queue->total_time.tv_sec += elapsed / 1000000000;
509 queue->total_time.tv_nsec = elapsed % 1000000000;
510 }
511skip_timing:
512 free(fi);
513
514 if (queue->thrs_push) {
515 CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), );
516 }
517
518 return ret;
519}
520
521/* Check if the low watermark callback must be called. */
522static __inline__ int test_l_cb(struct fifo * queue)
523{
524 if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0))
525 return 0;
526
527 if (((queue->count % queue->high) == queue->low) && (queue->highest > queue->count)) {
528 queue->highest -= queue->high;
529 return 1;
530 }
531
532 return 0;
533}
534
535/* Try poping an item */
536int fd_fifo_tryget_int ( struct fifo * queue, void ** item )
537{
538 int wouldblock = 0;
539 int call_cb = 0;
540
541 TRACE_ENTRY( "%p %p", queue, item );
542
543 /* Check the parameters */
544 CHECK_PARAMS( CHECK_FIFO( queue ) && item );
545
546 /* lock the queue */
547 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
548
549 /* Check queue status */
550 if (queue->count > 0) {
551got_item:
552 /* There are elements in the queue, so pick the first one */
553 *item = mq_pop(queue);
554 call_cb = test_l_cb(queue);
555 } else {
556 if (queue->thrs_push > 0) {
557 /* A thread is trying to push something, let's give it a chance */
558 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
559 CHECK_POSIX( pthread_cond_signal( &queue->cond_push ) );
560 usleep(1000);
561 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
562 if (queue->count > 0)
563 goto got_item;
564 }
565
566 wouldblock = 1;
567 *item = NULL;
568 }
569
570 /* Unlock */
571 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
572
573 /* Call low watermark callback as needed */
574 if (call_cb)
575 (*queue->l_cb)(queue, &queue->data);
576
577 /* Done */
578 return wouldblock ? EWOULDBLOCK : 0;
579}
580
581/* This handler is called when a thread is blocked on a queue, and cancelled */
582static void fifo_cleanup(void * queue)
583{
584 struct fifo * q = (struct fifo *)queue;
585 TRACE_ENTRY( "%p", queue );
586
587 /* The thread has been cancelled, therefore it does not wait on the queue anymore */
588 q->thrs--;
589
590 /* Now unlock the queue, and we're done */
591 CHECK_POSIX_DO( pthread_mutex_unlock( &q->mtx ), /* nothing */ );
592
593 /* End of cleanup handler */
594 return;
595}
596
597/* The internal function for fd_fifo_timedget and fd_fifo_get */
598static int fifo_tget ( struct fifo * queue, void ** item, int istimed, const struct timespec *abstime)
599{
600 int call_cb = 0;
601 int ret = 0;
602
603 /* Check the parameters */
604 CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) );
605
606 /* Initialize the return value */
607 *item = NULL;
608
609 /* lock the queue */
610 CHECK_POSIX( pthread_mutex_lock( &queue->mtx ) );
611
612awaken:
613 /* Check queue status */
614 if (!CHECK_FIFO( queue )) {
615 /* The queue is being destroyed */
616 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
617 TRACE_DEBUG(FULL, "The queue is being destroyed -> EPIPE");
618 return EPIPE;
619 }
620
621 if (queue->count > 0) {
622 /* There are items in the queue, so pick the first one */
623 *item = mq_pop(queue);
624 call_cb = test_l_cb(queue);
625 } else {
626 /* We have to wait for a new item */
627 queue->thrs++ ;
628 pthread_cleanup_push( fifo_cleanup, queue);
629 if (istimed) {
630 ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
631 } else {
632 ret = pthread_cond_wait( &queue->cond_pull, &queue->mtx );
633 }
634 pthread_cleanup_pop(0);
635 queue->thrs-- ;
636 if (ret == 0)
637 goto awaken; /* test for spurious wake-ups */
638
639 /* otherwise (ETIMEDOUT / other error) just continue */
640 }
641
642 /* Unlock */
643 CHECK_POSIX( pthread_mutex_unlock( &queue->mtx ) );
644
645 /* Call low watermark callback as needed */
646 if (call_cb)
647 (*queue->l_cb)(queue, &queue->data);
648
649 /* Done */
650 return ret;
651}
652
653/* Get the next available item, block until there is one */
654int fd_fifo_get_int ( struct fifo * queue, void ** item )
655{
656 TRACE_ENTRY( "%p %p", queue, item );
657 return fifo_tget(queue, item, 0, NULL);
658}
659
660/* Get the next available item, block until there is one, or the timeout expires */
661int fd_fifo_timedget_int ( struct fifo * queue, void ** item, const struct timespec *abstime )
662{
663 TRACE_ENTRY( "%p %p %p", queue, item, abstime );
664 return fifo_tget(queue, item, 1, abstime);
665}
666
667/* Test if data is available in the queue, without pulling it */
668int fd_fifo_select ( struct fifo * queue, const struct timespec *abstime )
669{
670 int ret = 0;
671 TRACE_ENTRY( "%p %p", queue, abstime );
672
673 CHECK_PARAMS_DO( CHECK_FIFO( queue ), return -EINVAL );
674
675 /* lock the queue */
676 CHECK_POSIX_DO( pthread_mutex_lock( &queue->mtx ), return -__ret__ );
677
678awaken:
679 ret = (queue->count > 0 ) ? queue->count : 0;
680 if ((ret == 0) && (abstime != NULL)) {
681 /* We have to wait for a new item */
682 queue->thrs++ ;
683 pthread_cleanup_push( fifo_cleanup, queue);
684 ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
685 pthread_cleanup_pop(0);
686 queue->thrs-- ;
687 if (ret == 0)
688 goto awaken; /* test for spurious wake-ups */
689
690 if (ret == ETIMEDOUT)
691 ret = 0;
692 else
693 ret = -ret;
694 }
695
696 /* Unlock */
697 CHECK_POSIX_DO( pthread_mutex_unlock( &queue->mtx ), return -__ret__ );
698
699 return ret;
700}