blob: 93df0657a6307c34a3d4308ce00df09f19e55258 [file] [log] [blame]
Brian Waters13d96012017-12-08 16:53:31 -06001/*********************************************************************************************************
2* Software License Agreement (BSD License) *
3* Author: Sebastien Decugis <sdecugis@freediameter.net> *
4* *
5* Copyright (c) 2015, WIDE Project and NICT *
6* All rights reserved. *
7* *
8* Redistribution and use of this software in source and binary forms, with or without modification, are *
9* permitted provided that the following conditions are met: *
10* *
11* * Redistributions of source code must retain the above *
12* copyright notice, this list of conditions and the *
13* following disclaimer. *
14* *
15* * Redistributions in binary form must reproduce the above *
16* copyright notice, this list of conditions and the *
17* following disclaimer in the documentation and/or other *
18* materials provided with the distribution. *
19* *
20* * Neither the name of the WIDE Project or NICT nor the *
21* names of its contributors may be used to endorse or *
22* promote products derived from this software without *
23* specific prior written permission of WIDE Project and *
24* NICT. *
25* *
26* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
27* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
28* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
29* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *
30* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS *
31* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
32* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF *
33* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *
34*********************************************************************************************************/
35
36#include "fdcore-internal.h"
37
38/********************************************************************************/
39/* First part : handling the extensions callbacks */
40/********************************************************************************/
41
42/* Lists of the callbacks, and locks to protect them */
43static pthread_rwlock_t rt_fwd_lock = PTHREAD_RWLOCK_INITIALIZER;
44static struct fd_list rt_fwd_list = FD_LIST_INITIALIZER_O(rt_fwd_list, &rt_fwd_lock);
45
46static pthread_rwlock_t rt_out_lock = PTHREAD_RWLOCK_INITIALIZER;
47static struct fd_list rt_out_list = FD_LIST_INITIALIZER_O(rt_out_list, &rt_out_lock);
48
49/* Items in the lists are the same */
50struct rt_hdl {
51 struct fd_list chain; /* link in the rt_fwd_list or rt_out_list */
52 void * cbdata; /* the registered data */
53 union {
54 int order; /* This value is used to sort the list */
55 int dir; /* It is the direction for FWD handlers */
56 int prio; /* and the priority for OUT handlers */
57 };
58 union {
59 int (*rt_fwd_cb)(void * cbdata, struct msg ** msg);
60 int (*rt_out_cb)(void * cbdata, struct msg ** msg, struct fd_list * candidates);
61 };
62};
63
64/* Add a new entry in the list */
65static int add_ordered(struct rt_hdl * new, struct fd_list * list)
66{
67 /* The list is ordered by prio parameter */
68 struct fd_list * li;
69
70 CHECK_POSIX( pthread_rwlock_wrlock(list->o) );
71
72 for (li = list->next; li != list; li = li->next) {
73 struct rt_hdl * h = (struct rt_hdl *) li;
74 if (new->order <= h->order)
75 break;
76 }
77
78 fd_list_insert_before(li, &new->chain);
79
80 CHECK_POSIX( pthread_rwlock_unlock(list->o) );
81
82 return 0;
83}
84
85/* Register a new FWD callback */
86int fd_rt_fwd_register ( int (*rt_fwd_cb)(void * cbdata, struct msg ** msg), void * cbdata, enum fd_rt_fwd_dir dir, struct fd_rt_fwd_hdl ** handler )
87{
88 struct rt_hdl * new;
89
90 TRACE_ENTRY("%p %p %d %p", rt_fwd_cb, cbdata, dir, handler);
91 CHECK_PARAMS( rt_fwd_cb );
92 CHECK_PARAMS( (dir >= RT_FWD_REQ) && ( dir <= RT_FWD_ANS) );
93
94 /* Create a new container */
95 CHECK_MALLOC(new = malloc(sizeof(struct rt_hdl)));
96 memset(new, 0, sizeof(struct rt_hdl));
97
98 /* Write the content */
99 fd_list_init(&new->chain, NULL);
100 new->cbdata = cbdata;
101 new->dir = dir;
102 new->rt_fwd_cb = rt_fwd_cb;
103
104 /* Save this in the list */
105 CHECK_FCT( add_ordered(new, &rt_fwd_list) );
106
107 /* Give it back to the extension if needed */
108 if (handler)
109 *handler = (void *)new;
110
111 return 0;
112}
113
114/* Remove it */
115int fd_rt_fwd_unregister ( struct fd_rt_fwd_hdl * handler, void ** cbdata )
116{
117 struct rt_hdl * del;
118 TRACE_ENTRY( "%p %p", handler, cbdata);
119 CHECK_PARAMS( handler );
120
121 del = (struct rt_hdl *)handler;
122 CHECK_PARAMS( del->chain.head == &rt_fwd_list );
123
124 /* Unlink */
125 CHECK_POSIX( pthread_rwlock_wrlock(&rt_fwd_lock) );
126 fd_list_unlink(&del->chain);
127 CHECK_POSIX( pthread_rwlock_unlock(&rt_fwd_lock) );
128
129 if (cbdata)
130 *cbdata = del->cbdata;
131
132 free(del);
133 return 0;
134}
135
136/* Register a new OUT callback */
137int fd_rt_out_register ( int (*rt_out_cb)(void * cbdata, struct msg ** pmsg, struct fd_list * candidates), void * cbdata, int priority, struct fd_rt_out_hdl ** handler )
138{
139 struct rt_hdl * new;
140
141 TRACE_ENTRY("%p %p %d %p", rt_out_cb, cbdata, priority, handler);
142 CHECK_PARAMS( rt_out_cb );
143
144 /* Create a new container */
145 CHECK_MALLOC(new = malloc(sizeof(struct rt_hdl)));
146 memset(new, 0, sizeof(struct rt_hdl));
147
148 /* Write the content */
149 fd_list_init(&new->chain, NULL);
150 new->cbdata = cbdata;
151 new->prio = priority;
152 new->rt_out_cb = rt_out_cb;
153
154 /* Save this in the list */
155 CHECK_FCT( add_ordered(new, &rt_out_list) );
156
157 /* Give it back to the extension if needed */
158 if (handler)
159 *handler = (void *)new;
160
161 return 0;
162}
163
164/* Remove it */
165int fd_rt_out_unregister ( struct fd_rt_out_hdl * handler, void ** cbdata )
166{
167 struct rt_hdl * del;
168 TRACE_ENTRY( "%p %p", handler, cbdata);
169 CHECK_PARAMS( handler );
170
171 del = (struct rt_hdl *)handler;
172 CHECK_PARAMS( del->chain.head == &rt_out_list );
173
174 /* Unlink */
175 CHECK_POSIX( pthread_rwlock_wrlock(&rt_out_lock) );
176 fd_list_unlink(&del->chain);
177 CHECK_POSIX( pthread_rwlock_unlock(&rt_out_lock) );
178
179 if (cbdata)
180 *cbdata = del->cbdata;
181
182 free(del);
183 return 0;
184}
185
186/********************************************************************************/
187/* Some default OUT routing callbacks */
188/********************************************************************************/
189
190/* Prevent sending to peers that do not support the message application */
191static int dont_send_if_no_common_app(void * cbdata, struct msg ** pmsg, struct fd_list * candidates)
192{
193 struct msg * msg = *pmsg;
194 struct fd_list * li;
195 struct msg_hdr * hdr;
196
197 TRACE_ENTRY("%p %p %p", cbdata, msg, candidates);
198 CHECK_PARAMS(msg && candidates);
199
200 CHECK_FCT( fd_msg_hdr(msg, &hdr) );
201
202 /* For Base Diameter Protocol, every peer is supposed to support it, so skip */
203 if (hdr->msg_appl == 0)
204 return 0;
205
206 /* Otherwise, check that the peers support the application */
207 for (li = candidates->next; li != candidates; li = li->next) {
208 struct rtd_candidate *c = (struct rtd_candidate *) li;
209 struct fd_peer * peer;
210 struct fd_app *found;
211 CHECK_FCT( fd_peer_getbyid( c->diamid, c->diamidlen, 0, (void *)&peer ) );
212 if (peer && !peer->p_hdr.info.runtime.pir_relay) {
213 /* Check if the remote peer advertised the message's appli */
214 CHECK_FCT( fd_app_check(&peer->p_hdr.info.runtime.pir_apps, hdr->msg_appl, &found) );
215 if (!found)
216 c->score += FD_SCORE_NO_DELIVERY;
217 }
218 }
219
220 return 0;
221}
222
223/* Detect if the Destination-Host and Destination-Realm match the peer */
224static int score_destination_avp(void * cbdata, struct msg ** pmsg, struct fd_list * candidates)
225{
226 struct msg * msg = *pmsg;
227 struct fd_list * li;
228 struct avp * avp;
229 union avp_value *dh = NULL, *dr = NULL;
230
231 TRACE_ENTRY("%p %p %p", cbdata, msg, candidates);
232 CHECK_PARAMS(msg && candidates);
233
234 /* Search the Destination-Host and Destination-Realm AVPs -- we could also use fd_msg_search_avp here, but this one is slightly more efficient */
235 CHECK_FCT( fd_msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL) );
236 while (avp) {
237 struct avp_hdr * ahdr;
238 CHECK_FCT( fd_msg_avp_hdr( avp, &ahdr ) );
239
240 if (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) {
241 switch (ahdr->avp_code) {
242 case AC_DESTINATION_HOST:
243 /* Parse this AVP */
244 CHECK_FCT( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ) );
245 ASSERT( ahdr->avp_value );
246 dh = ahdr->avp_value;
247 break;
248
249 case AC_DESTINATION_REALM:
250 /* Parse this AVP */
251 CHECK_FCT( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ) );
252 ASSERT( ahdr->avp_value );
253 dr = ahdr->avp_value;
254 break;
255 }
256 }
257
258 if (dh && dr)
259 break;
260
261 /* Go to next AVP */
262 CHECK_FCT( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
263 }
264
265 /* Now, check each candidate against these AVP values */
266 for (li = candidates->next; li != candidates; li = li->next) {
267 struct rtd_candidate *c = (struct rtd_candidate *) li;
268
269 #if 0 /* this is actually useless since the sending process will also ensure that the peer is still available */
270 struct fd_peer * peer;
271 /* Since the candidates list comes from the peers list, we do not have any issue with upper/lower case to find the peer object */
272 CHECK_FCT( fd_peer_getbyid( c->diamid, c->diamidlen, 0, (void *)&peer ) );
273 if (!peer)
274 continue; /* it has been deleted since the candidate list was generated; avoid sending to this one in that case. */
275 #endif /* 0 */
276
277 /* In the AVPs, the value comes from the network, so let's be case permissive */
278 if (dh && !fd_os_almostcasesrch(dh->os.data, dh->os.len, c->diamid, c->diamidlen, NULL) ) {
279 /* The candidate is the Destination-Host */
280 c->score += FD_SCORE_FINALDEST;
281 } else {
282 if (dr && !fd_os_almostcasesrch(dr->os.data, dr->os.len, c->realm, c->realmlen, NULL) ) {
283 /* The candidate's realm matchs the Destination-Realm */
284 c->score += FD_SCORE_REALM;
285 }
286 }
287 }
288
289 return 0;
290}
291
292/********************************************************************************/
293/* Helper functions */
294/********************************************************************************/
295
296/* Find (first) '!' and '@' positions in a UTF-8 encoded string (User-Name AVP value) */
297static void nai_get_indexes(union avp_value * un, int * excl_idx, int * at_idx)
298{
299 int i;
300
301 TRACE_ENTRY("%p %p %p", un, excl_idx, at_idx);
302 CHECK_PARAMS_DO( un && excl_idx && at_idx, return );
303
304 *excl_idx = 0;
305 *at_idx = 0;
306
307 /* Search if there is a '!' before any '@' -- do we need to check it contains a '.' ? */
308 for (i = 0; i < un->os.len; i++) {
309 /* The '!' marks the decorated NAI */
310 if ( un->os.data[i] == (unsigned char) '!' ) {
311 if (!*excl_idx)
312 *excl_idx = i;
313 continue;
314 }
315 /* If we reach the realm part, we can stop */
316 if ( un->os.data[i] == (unsigned char) '@' ) {
317 *at_idx = i;
318 break;
319 }
320 /* Stop if we find a \0 in the middle */
321 if ( un->os.data[i] == 0 ) {
322 return;
323 }
324 /* Skip escaped characters */
325 if ( un->os.data[i] == (unsigned char) '\\' ) {
326 i++;
327 continue;
328 }
329 }
330
331 return;
332}
333
334/* Test if a User-Name AVP contains a Decorated NAI -- RFC4282, RFC5729 */
335/* Create new User-Name and Destination-Realm values */
336static int process_decorated_NAI(int * was_nai, union avp_value * un, union avp_value * dr)
337{
338 int at_idx, sep_idx;
339 unsigned char * old_un;
340 TRACE_ENTRY("%p %p %p", was_nai, un, dr);
341 CHECK_PARAMS(was_nai && un && dr);
342
343 /* Save the decorated User-Name, for example 'homerealm.example.net!user@otherrealm.example.net' */
344 old_un = un->os.data;
345
346 /* Search the positions of the first '!' and the '@' in the string */
347 nai_get_indexes(un, &sep_idx, &at_idx);
348 if ((!sep_idx) || (sep_idx > at_idx) || !fd_os_is_valid_DiameterIdentity(old_un, sep_idx /* this is the new realm part */)) {
349 *was_nai = 0;
350 return 0;
351 }
352
353 *was_nai = 1;
354
355 /* Create the new User-Name value */
356 CHECK_MALLOC( un->os.data = malloc( at_idx ) );
357 memcpy( un->os.data, old_un + sep_idx + 1, at_idx - sep_idx ); /* user@ */
358 memcpy( un->os.data + at_idx - sep_idx, old_un, sep_idx ); /* homerealm.example.net */
359
360 /* Create the new Destination-Realm value */
361 CHECK_MALLOC( dr->os.data = realloc(dr->os.data, sep_idx) );
362 memcpy( dr->os.data, old_un, sep_idx );
363 dr->os.len = sep_idx;
364
365 TRACE_DEBUG(FULL, "Processed Decorated NAI : '%.*s' became '%.*s' (%.*s)",
366 (int)un->os.len, old_un,
367 (int)at_idx, un->os.data,
368 (int)dr->os.len, dr->os.data);
369
370 un->os.len = at_idx;
371 free(old_un);
372
373 return 0;
374}
375
376
377/* Function to return an error to an incoming request */
378static int return_error(struct msg ** pmsg, char * error_code, char * error_message, struct avp * failedavp)
379{
380 struct fd_peer * peer;
381 int is_loc = 0;
382
383 /* Get the source of the message */
384 {
385 DiamId_t id;
386 size_t idlen;
387 CHECK_FCT( fd_msg_source_get( *pmsg, &id, &idlen ) );
388
389 if (id == NULL) {
390 is_loc = 1; /* The message was issued locally */
391 } else {
392
393 /* Search the peer with this id */
394 CHECK_FCT( fd_peer_getbyid( id, idlen, 0, (void *)&peer ) );
395
396 if (!peer) {
397 char buf[256];
398 snprintf(buf, sizeof(buf), "Unable to send error '%s' to deleted peer '%s' in reply to this message.", error_code, id);
399 fd_hook_call(HOOK_MESSAGE_DROPPED, *pmsg, NULL, buf, fd_msg_pmdl_get(*pmsg));
400 fd_msg_free(*pmsg);
401 *pmsg = NULL;
402 return 0;
403 }
404 }
405 }
406
407 /* Create the error message */
408 CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, pmsg, MSGFL_ANSW_ERROR ) );
409
410 /* Set the error code */
411 CHECK_FCT( fd_msg_rescode_set(*pmsg, error_code, error_message, failedavp, 1 ) );
412
413 /* Send the answer */
414 if (is_loc) {
415 CHECK_FCT( fd_fifo_post(fd_g_incoming, pmsg) );
416 } else {
417 CHECK_FCT( fd_out_send(pmsg, NULL, peer, 1) );
418 }
419
420 /* Done */
421 return 0;
422}
423
424
425/****************************************************************************/
426/* Second part : threads moving messages in the daemon */
427/****************************************************************************/
428
429/* The DISPATCH message processing */
430static int msg_dispatch(struct msg * msg)
431{
432 struct msg_hdr * hdr;
433 int is_req = 0;
434 struct session * sess;
435 enum disp_action action;
436 char * ec = NULL;
437 char * em = NULL;
438 struct msg *msgptr = msg, *error = NULL;
439
440 /* Read the message header */
441 CHECK_FCT( fd_msg_hdr(msg, &hdr) );
442 is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
443
444 /* Note: if the message is for local delivery, we should test for duplicate
445 (draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */
446
447 /* At this point, we need to understand the message content, so parse it */
448 CHECK_FCT_DO( fd_msg_parse_or_error( &msgptr, &error ),
449 {
450 int rescue = 0;
451 if (__ret__ != EBADMSG) {
452 fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, "Error while parsing received answer", fd_msg_pmdl_get(msgptr));
453 fd_msg_free(msgptr);
454 } else {
455 if (!msgptr) {
456 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR2, error, NULL, NULL, fd_msg_pmdl_get(error));
457 /* error now contains the answer message to send back */
458 CHECK_FCT( fd_fifo_post(fd_g_outgoing, &error) );
459 } else if (!error) {
460 /* We have received an invalid answer to our query */
461 fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, "Received answer failed the dictionary / rules parsing", fd_msg_pmdl_get(msgptr));
462 fd_msg_free(msgptr);
463 } else {
464 /* We will pass the invalid received error to the application */
465 rescue = 1;
466 }
467 }
468 if (!rescue)
469 return 0; /* We are done with this message, go to the next */
470 } );
471
472 /* First, if the original request was registered with a callback and we receive the answer, call it. */
473 if ( ! is_req ) {
474 struct msg * qry;
475 void (*anscb)(void *, struct msg **) = NULL;
476 void * data = NULL;
477
478 /* Retrieve the corresponding query */
479 CHECK_FCT( fd_msg_answ_getq( msgptr, &qry ) );
480
481 /* Retrieve any registered handler */
482 CHECK_FCT( fd_msg_anscb_get( qry, &anscb, NULL, &data ) );
483
484 /* If a callback was registered, pass the message to it */
485 if (anscb != NULL) {
486
487 TRACE_DEBUG(FULL, "Calling callback registered when query was sent (%p, %p)", anscb, data);
488 (*anscb)(data, &msgptr);
489
490 /* If the message is processed, we're done */
491 if (msgptr == NULL) {
492 return 0;
493 }
494
495 /* otherwise continue the dispatching --hoping that the anscb callback did not mess with our message :) */
496 }
497 }
498
499 /* Retrieve the session of the message */
500 CHECK_FCT( fd_msg_sess_get(fd_g_config->cnf_dict, msgptr, &sess, NULL) );
501
502 /* Now, call any callback registered for the message */
503 CHECK_FCT( fd_msg_dispatch ( &msgptr, sess, &action, &ec, &em, &error) );
504
505 /* Now, act depending on msg and action and ec */
506 if (msgptr) {
507 switch ( action ) {
508 case DISP_ACT_CONT:
509 /* No callback has handled the message, let's reply with a generic error or relay it */
510 if (!fd_g_config->cnf_flags.no_fwd) {
511 /* requeue to fd_g_outgoing */
512 fd_hook_call(HOOK_MESSAGE_ROUTING_FORWARD, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
513 CHECK_FCT( fd_fifo_post(fd_g_outgoing, &msgptr) );
514 break;
515 }
516 /* We don't relay => reply error */
517 em = "The message was not handled by any extension callback";
518 ec = "DIAMETER_COMMAND_UNSUPPORTED";
519 /* and continue as if an error occurred... */
520 case DISP_ACT_ERROR:
521 /* We have a problem with delivering the message */
522 if (ec == NULL) {
523 ec = "DIAMETER_UNABLE_TO_COMPLY";
524 }
525
526 if (!is_req) {
527 fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, "Internal error: Answer received to locally issued request, but not handled by any handler.", fd_msg_pmdl_get(msgptr));
528 fd_msg_free(msgptr);
529 break;
530 }
531
532 /* Create an answer with the error code and message */
533 CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msgptr, 0 ) );
534 CHECK_FCT( fd_msg_rescode_set(msgptr, ec, em, NULL, 1 ) );
535
536 case DISP_ACT_SEND:
537 /* Now, send the message */
538 CHECK_FCT( fd_fifo_post(fd_g_outgoing, &msgptr) );
539 }
540 } else if (em) {
541 fd_hook_call(HOOK_MESSAGE_DROPPED, error, NULL, em, fd_msg_pmdl_get(error));
542 fd_msg_free(error);
543 }
544
545 /* We're done with dispatching this message */
546 return 0;
547}
548
549/* The ROUTING-IN message processing */
550static int msg_rt_in(struct msg * msg)
551{
552 struct msg_hdr * hdr;
553 int is_req = 0;
554 int is_err = 0;
555 DiamId_t qry_src = NULL;
556 struct msg *msgptr = msg;
557
558 /* Read the message header */
559 CHECK_FCT( fd_msg_hdr(msg, &hdr) );
560 is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
561 is_err = hdr->msg_flags & CMD_FLAG_ERROR;
562
563 /* Handle incorrect bits */
564 if (is_req && is_err) {
565 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "R & E bits were set", fd_msg_pmdl_get(msgptr));
566 CHECK_FCT( return_error( &msgptr, "DIAMETER_INVALID_HDR_BITS", "R & E bits were set", NULL) );
567 return 0;
568 }
569
570 /* If it is a request, we must analyze its content to decide what we do with it */
571 if (is_req) {
572 struct avp * avp, *un = NULL;
573 union avp_value * un_val = NULL, *dr_val = NULL;
574 enum status { UNKNOWN, YES, NO };
575 /* Are we Destination-Host? */
576 enum status is_dest_host = UNKNOWN;
577 /* Are we Destination-Realm? */
578 enum status is_dest_realm = UNKNOWN;
579 /* Do we support the application of the message? */
580 enum status is_local_app = UNKNOWN;
581
582 /* Check if we have local support for the message application */
583 if ( (hdr->msg_appl == 0) || (hdr->msg_appl == AI_RELAY) ) {
584 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Received a routable message with application id 0 or " _stringize(AI_RELAY) " (relay)", fd_msg_pmdl_get(msgptr));
585 CHECK_FCT( return_error( &msgptr, "DIAMETER_APPLICATION_UNSUPPORTED", "Routable message with application id 0 or relay", NULL) );
586 return 0;
587 } else {
588 struct fd_app * app;
589 CHECK_FCT( fd_app_check(&fd_g_config->cnf_apps, hdr->msg_appl, &app) );
590 is_local_app = (app ? YES : NO);
591 }
592
593 /* Parse the message for Dest-Host, Dest-Realm, and Route-Record */
594 CHECK_FCT( fd_msg_browse(msgptr, MSG_BRW_FIRST_CHILD, &avp, NULL) );
595 while (avp) {
596 struct avp_hdr * ahdr;
597 struct fd_pei error_info;
598 int ret;
599
600 memset(&error_info, 0, sizeof(struct fd_pei));
601
602 CHECK_FCT( fd_msg_avp_hdr( avp, &ahdr ) );
603
604 if (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) {
605 switch (ahdr->avp_code) {
606 case AC_DESTINATION_HOST:
607 /* Parse this AVP */
608 CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
609 {
610 if (error_info.pei_errcode) {
611 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
612 CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
613 if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
614 return 0;
615 } else {
616 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing Destination-Host AVP", fd_msg_pmdl_get(msgptr));
617 return ret;
618 }
619 } );
620 ASSERT( ahdr->avp_value );
621 /* Compare the Destination-Host AVP of the message with our identity */
622 if (!fd_os_almostcasesrch(ahdr->avp_value->os.data, ahdr->avp_value->os.len, fd_g_config->cnf_diamid, fd_g_config->cnf_diamid_len, NULL)) {
623 is_dest_host = YES;
624 } else {
625 is_dest_host = NO;
626 }
627 break;
628
629 case AC_DESTINATION_REALM:
630 /* Parse this AVP */
631 CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
632 {
633 if (error_info.pei_errcode) {
634 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
635 CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
636 if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
637 return 0;
638 } else {
639 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing Destination-Realm AVP", fd_msg_pmdl_get(msgptr));
640 return ret;
641 }
642 } );
643 ASSERT( ahdr->avp_value );
644 dr_val = ahdr->avp_value;
645 /* Compare the Destination-Realm AVP of the message with our identity */
646 if (!fd_os_almostcasesrch(dr_val->os.data, dr_val->os.len, fd_g_config->cnf_diamrlm, fd_g_config->cnf_diamrlm_len, NULL)) {
647 is_dest_realm = YES;
648 } else {
649 is_dest_realm = NO;
650 }
651 break;
652
653 /* we also use User-Name for decorated NAI */
654 case AC_USER_NAME:
655 /* Parse this AVP */
656 CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
657 {
658 if (error_info.pei_errcode) {
659 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
660 CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
661 if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
662 return 0;
663 } else {
664 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing User-Name AVP", fd_msg_pmdl_get(msgptr));
665 return ret;
666 }
667 } );
668 ASSERT( ahdr->avp_value );
669 un = avp;
670 un_val = ahdr->avp_value;
671 break;
672
673 case AC_ROUTE_RECORD:
674 /* Parse this AVP */
675 CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
676 {
677 if (error_info.pei_errcode) {
678 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
679 CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
680 if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
681 return 0;
682 } else {
683 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing Route-Record AVP", fd_msg_pmdl_get(msgptr));
684 return ret;
685 }
686 } );
687 ASSERT( ahdr->avp_value );
688 /* Is this our own name ? */
689 if (!fd_os_almostcasesrch(ahdr->avp_value->os.data, ahdr->avp_value->os.len, fd_g_config->cnf_diamid, fd_g_config->cnf_diamid_len, NULL)) {
690 /* Yes: then we must return DIAMETER_LOOP_DETECTED according to Diameter RFC */
691 char * error = "DIAMETER_LOOP_DETECTED";
692 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error, fd_msg_pmdl_get(msgptr));
693 CHECK_FCT( return_error( &msgptr, error, NULL, NULL) );
694 return 0;
695 }
696 break;
697
698
699 }
700 }
701
702 /* Stop when we found all 3 AVPs -- they are supposed to be at the beginning of the message, so this should be fast */
703 if ((is_dest_host != UNKNOWN) && (is_dest_realm != UNKNOWN) && un)
704 break;
705
706 /* Go to next AVP */
707 CHECK_FCT( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
708 }
709
710 /* OK, now decide what we do with the request */
711
712 /* Handle the missing routing AVPs first */
713 if ( is_dest_realm == UNKNOWN ) {
714 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Non-routable message not supported (invalid bit ? missing Destination-Realm ?)", fd_msg_pmdl_get(msgptr));
715 CHECK_FCT( return_error( &msgptr, "DIAMETER_COMMAND_UNSUPPORTED", "Non-routable message not supported (invalid bit ? missing Destination-Realm ?)", NULL) );
716 return 0;
717 }
718
719 /* If we are listed as Destination-Host */
720 if (is_dest_host == YES) {
721 if (is_local_app == YES) {
722 /* Ok, give the message to the dispatch thread */
723 fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
724 CHECK_FCT( fd_fifo_post(fd_g_local, &msgptr) );
725 } else {
726 /* We don't support the application, reply an error */
727 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Application unsupported", fd_msg_pmdl_get(msgptr));
728 CHECK_FCT( return_error( &msgptr, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL) );
729 }
730 return 0;
731 }
732
733 /* If the message is explicitely for someone else */
734 if ((is_dest_host == NO) || (is_dest_realm == NO)) {
735 if (fd_g_config->cnf_flags.no_fwd) {
736 fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, "Message for another realm/host", fd_msg_pmdl_get(msgptr));
737 CHECK_FCT( return_error( &msgptr, "DIAMETER_UNABLE_TO_DELIVER", "I am not a Diameter agent", NULL) );
738 return 0;
739 }
740 } else {
741 /* Destination-Host was not set, and Destination-Realm is matching : we may handle or pass to a fellow peer */
742 int is_nai = 0;
743
744 /* test for decorated NAI (RFC5729 section 4.4) */
745 /* Handle the decorated NAI */
746 if (un_val) {
747 CHECK_FCT_DO( process_decorated_NAI(&is_nai, un_val, dr_val),
748 {
749 /* If the process failed, we assume it is because of the AVP format */
750 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Failed to process decorated NAI", fd_msg_pmdl_get(msgptr));
751 CHECK_FCT( return_error( &msgptr, "DIAMETER_INVALID_AVP_VALUE", "Failed to process decorated NAI", un) );
752 return 0;
753 } );
754 }
755
756 if (is_nai) {
757 /* We have transformed the AVP, now submit it again in the queue */
758 CHECK_FCT(fd_fifo_post(fd_g_incoming, &msgptr) );
759 return 0;
760 }
761
762 if (is_local_app == YES) {
763 /* Handle localy since we are able to */
764 fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
765 CHECK_FCT(fd_fifo_post(fd_g_local, &msgptr) );
766 return 0;
767 }
768
769 if (fd_g_config->cnf_flags.no_fwd) {
770 /* We return an error */
771 fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, "Application unsupported", fd_msg_pmdl_get(msgptr));
772 CHECK_FCT( return_error( &msgptr, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL) );
773 return 0;
774 }
775 }
776
777 /* From that point, for requests, we will call the registered callbacks, then forward to another peer */
778
779 } else {
780 /* The message is an answer */
781 struct msg * qry;
782
783 /* Retrieve the corresponding query and its origin */
784 CHECK_FCT( fd_msg_answ_getq( msgptr, &qry ) );
785 CHECK_FCT( fd_msg_source_get( qry, &qry_src, NULL ) );
786
787 if ((!qry_src) && (!is_err)) {
788 /* The message is a normal answer to a request issued localy, we do not call the callbacks chain on it. */
789 fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
790 CHECK_FCT(fd_fifo_post(fd_g_local, &msgptr) );
791 return 0;
792 }
793
794 /* From that point, for answers, we will call the registered callbacks, then pass it to the dispatch module or forward it */
795 }
796
797 /* Call all registered callbacks for this message */
798 {
799 struct fd_list * li;
800
801 CHECK_FCT( pthread_rwlock_rdlock( &rt_fwd_lock ) );
802 pthread_cleanup_push( fd_cleanup_rwlock, &rt_fwd_lock );
803
804 /* requests: dir = 1 & 2 => in order; answers = 3 & 2 => in reverse order */
805 for ( li = (is_req ? rt_fwd_list.next : rt_fwd_list.prev) ; msgptr && (li != &rt_fwd_list) ; li = (is_req ? li->next : li->prev) ) {
806 struct rt_hdl * rh = (struct rt_hdl *)li;
807 int ret;
808
809 if (is_req && (rh->dir > RT_FWD_ALL))
810 break;
811 if ((!is_req) && (rh->dir < RT_FWD_ALL))
812 break;
813
814 /* Ok, call this cb */
815 TRACE_DEBUG(ANNOYING, "Calling next FWD callback on %p : %p", msgptr, rh->rt_fwd_cb);
816 CHECK_FCT_DO( ret = (*rh->rt_fwd_cb)(rh->cbdata, &msgptr),
817 {
818 char buf[256];
819 snprintf(buf, sizeof(buf), "A FWD routing callback returned an error: %s", strerror(ret));
820 fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
821 fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
822 fd_msg_free(msgptr);
823 msgptr = NULL;
824 break;
825 } );
826 }
827
828 pthread_cleanup_pop(0);
829 CHECK_FCT( pthread_rwlock_unlock( &rt_fwd_lock ) );
830
831 /* If a callback has handled the message, we stop now */
832 if (!msgptr)
833 return 0;
834 }
835
836 /* Now pass the message to the next step: either forward to another peer, or dispatch to local extensions */
837 if (is_req || qry_src) {
838 fd_hook_call(HOOK_MESSAGE_ROUTING_FORWARD, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
839 CHECK_FCT(fd_fifo_post(fd_g_outgoing, &msgptr) );
840 } else {
841 fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
842 CHECK_FCT(fd_fifo_post(fd_g_local, &msgptr) );
843 }
844
845 /* We're done with this message */
846 return 0;
847}
848
849
850/* The ROUTING-OUT message processing */
851static int msg_rt_out(struct msg * msg)
852{
853 struct rt_data * rtd = NULL;
854 struct msg_hdr * hdr;
855 int is_req = 0;
856 int ret;
857 struct fd_list * li, *candidates;
858 struct avp * avp;
859 struct rtd_candidate * c;
860 struct msg *msgptr = msg;
861 DiamId_t qry_src = NULL;
862 size_t qry_src_len = 0;
863
864 /* Read the message header */
865 CHECK_FCT( fd_msg_hdr(msgptr, &hdr) );
866 is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
867
868 /* For answers, the routing is very easy */
869 if ( ! is_req ) {
870 struct msg * qry;
871 struct msg_hdr * qry_hdr;
872 struct fd_peer * peer = NULL;
873
874 /* Retrieve the corresponding query and its origin */
875 CHECK_FCT( fd_msg_answ_getq( msgptr, &qry ) );
876 CHECK_FCT( fd_msg_source_get( qry, &qry_src, &qry_src_len ) );
877
878 ASSERT( qry_src ); /* if it is NULL, the message should have been in the LOCAL queue! */
879
880 /* Find the peer corresponding to this name */
881 CHECK_FCT( fd_peer_getbyid( qry_src, qry_src_len, 0, (void *) &peer ) );
882 if (fd_peer_getstate(peer) != STATE_OPEN && fd_peer_getstate(peer) != STATE_CLOSING_GRACE) {
883 char buf[128];
884 snprintf(buf, sizeof(buf), "Unable to forward answer to deleted / closed peer '%s'.", qry_src);
885 fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
886 fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
887 fd_msg_free(msgptr);
888 return 0;
889 }
890
891 /* We must restore the hop-by-hop id */
892 CHECK_FCT( fd_msg_hdr(qry, &qry_hdr) );
893 hdr->msg_hbhid = qry_hdr->msg_hbhid;
894
895 /* Push the message into this peer */
896 CHECK_FCT( fd_out_send(&msgptr, NULL, peer, 1) );
897
898 /* We're done with this answer */
899 return 0;
900 }
901
902 /* From that point, the message is a request */
903 CHECK_FCT( fd_msg_source_get( msgptr, &qry_src, &qry_src_len ) );
904 /* if qry_src != NULL, this message is relayed, otherwise it is locally issued */
905
906 /* Get the routing data out of the message if any (in case of re-transmit) */
907 CHECK_FCT( fd_msg_rt_get ( msgptr, &rtd ) );
908
909 /* If there is no routing data already, let's create it */
910 if (rtd == NULL) {
911 CHECK_FCT( fd_rtd_init(&rtd) );
912
913 /* Add all peers currently in OPEN state */
914 CHECK_FCT( pthread_rwlock_rdlock(&fd_g_activ_peers_rw) );
915 for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) {
916 struct fd_peer * p = (struct fd_peer *)li->o;
917 CHECK_FCT_DO( ret = fd_rtd_candidate_add(rtd,
918 p->p_hdr.info.pi_diamid,
919 p->p_hdr.info.pi_diamidlen,
920 p->p_hdr.info.runtime.pir_realm,
921 p->p_hdr.info.runtime.pir_realmlen),
922 { CHECK_FCT_DO( pthread_rwlock_unlock(&fd_g_activ_peers_rw), ); return ret; } );
923 }
924 CHECK_FCT( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
925
926 /* Now let's remove all peers from the Route-Records */
927 CHECK_FCT( fd_msg_browse(msgptr, MSG_BRW_FIRST_CHILD, &avp, NULL) );
928 while (avp) {
929 struct avp_hdr * ahdr;
930 struct fd_pei error_info;
931 CHECK_FCT( fd_msg_avp_hdr( avp, &ahdr ) );
932
933 if ((ahdr->avp_code == AC_ROUTE_RECORD) && (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) ) {
934 /* Parse this AVP */
935 CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
936 {
937 if (error_info.pei_errcode) {
938 CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
939 if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
940 return 0;
941 } else {
942 return ret;
943 }
944 } );
945 ASSERT( ahdr->avp_value );
946 /* Remove this value from the list. We don't need to pay special attention to the contents here. */
947 fd_rtd_candidate_del(rtd, ahdr->avp_value->os.data, ahdr->avp_value->os.len);
948 }
949
950 /* Go to next AVP */
951 CHECK_FCT( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
952 }
953
954 /* Save the routing information in the message */
955 CHECK_FCT( fd_msg_rt_associate ( msgptr, rtd ) );
956 }
957
958 /* Note: we reset the scores and pass the message to the callbacks, maybe we could re-use the saved scores when we have received an error ? -- TODO */
959
960 /* Ok, we have our list in rtd now, let's (re)initialize the scores */
961 fd_rtd_candidate_extract(rtd, &candidates, FD_SCORE_INI);
962
963 /* Pass the list to registered callbacks (even if it is empty list) */
964 {
965 CHECK_FCT( pthread_rwlock_rdlock( &rt_out_lock ) );
966 pthread_cleanup_push( fd_cleanup_rwlock, &rt_out_lock );
967
968 /* We call the cb by reverse priority order */
969 for ( li = rt_out_list.prev ; (msgptr != NULL) && (li != &rt_out_list) ; li = li->prev ) {
970 struct rt_hdl * rh = (struct rt_hdl *)li;
971
972 TRACE_DEBUG(ANNOYING, "Calling next OUT callback on %p : %p (prio %d)", msgptr, rh->rt_out_cb, rh->prio);
973 CHECK_FCT_DO( ret = (*rh->rt_out_cb)(rh->cbdata, &msgptr, candidates),
974 {
975 char buf[256];
976 snprintf(buf, sizeof(buf), "An OUT routing callback returned an error: %s", strerror(ret));
977 fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
978 fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
979 fd_msg_free(msgptr);
980 msgptr = NULL;
981 } );
982 }
983
984 pthread_cleanup_pop(0);
985 CHECK_FCT( pthread_rwlock_unlock( &rt_out_lock ) );
986
987 /* If an error occurred or the callback disposed of the message, go to next message */
988 if (! msgptr) {
989 return 0;
990 }
991 }
992
993 /* Order the candidate peers by score attributed by the callbacks */
994 CHECK_FCT( fd_rtd_candidate_reorder(candidates) );
995
996 /* Now try sending the message */
997 for (li = candidates->prev; li != candidates; li = li->prev) {
998 struct fd_peer * peer;
999
1000 c = (struct rtd_candidate *) li;
1001
1002 /* Stop when we have reached the end of valid candidates */
1003 if (c->score < 0)
1004 break;
1005
1006 /* Search for the peer */
1007 CHECK_FCT( fd_peer_getbyid( c->diamid, c->diamidlen, 0, (void *)&peer ) );
1008
1009 if (fd_peer_getstate(peer) == STATE_OPEN) {
1010 /* Send to this one */
1011 CHECK_FCT_DO( fd_out_send(&msgptr, NULL, peer, 1), continue );
1012
1013 /* If the sending was successful */
1014 break;
1015 }
1016 }
1017
1018 /* If the message has not been sent, return an error */
1019 if (msgptr) {
1020 fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, "No remaining suitable candidate to route the message to", fd_msg_pmdl_get(msgptr));
1021 return_error( &msgptr, "DIAMETER_UNABLE_TO_DELIVER", "No suitable candidate to route the message to", NULL);
1022 }
1023
1024 /* We're done with this message */
1025
1026 return 0;
1027}
1028
1029
1030/********************************************************************************/
1031/* Management of the threads */
1032/********************************************************************************/
1033
1034/* Note: in the first version, we only create one thread of each kind.
1035 We could improve the scalability by using the threshold feature of the queues
1036 to create additional threads if a queue is filling up, or at least giving a configurable
1037 number of threads of each kind.
1038 */
1039
1040/* Control of the threads */
1041static enum { RUN = 0, STOP = 1 } order_val = RUN;
1042static pthread_mutex_t order_state_lock = PTHREAD_MUTEX_INITIALIZER;
1043
1044/* Threads report their status */
1045enum thread_state { NOTRUNNING = 0, RUNNING = 1 };
1046static void cleanup_state(void * state_loc)
1047{
1048 CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1049 *(enum thread_state *)state_loc = NOTRUNNING;
1050 CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1051}
1052
1053/* This is the common thread code (same for routing and dispatching) */
1054static void * process_thr(void * arg, int (*action_cb)(struct msg * msg), struct fifo * queue, char * action_name)
1055{
1056 TRACE_ENTRY("%p %p %p %p", arg, action_cb, queue, action_name);
1057
1058 /* Set the thread name */
1059 {
1060 char buf[48];
1061 snprintf(buf, sizeof(buf), "%s (%p)", action_name, arg);
1062 fd_log_threadname ( buf );
1063 }
1064
1065 /* The thread reports its status when canceled */
1066 CHECK_PARAMS_DO(arg, return NULL);
1067 pthread_cleanup_push( cleanup_state, arg );
1068
1069 /* Mark the thread running */
1070 CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1071 *(enum thread_state *)arg = RUNNING;
1072 CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1073
1074 do {
1075 struct msg * msg;
1076
1077 /* Test the current order */
1078 {
1079 int must_stop;
1080 CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), { ASSERT(0); } ); /* we lock to flush the caches */
1081 must_stop = (order_val == STOP);
1082 CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), { ASSERT(0); } );
1083 if (must_stop)
1084 goto end;
1085
1086 pthread_testcancel();
1087 }
1088
1089 /* Ok, we are allowed to run */
1090
1091 /* Get the next message from the queue */
1092 {
1093 int ret;
1094 struct timespec ts;
1095
1096 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), goto fatal_error );
1097 ts.tv_sec += 1;
1098
1099 ret = fd_fifo_timedget ( queue, &msg, &ts );
1100 if (ret == ETIMEDOUT)
1101 /* loop, check if the thread must stop now */
1102 continue;
1103 if (ret == EPIPE)
1104 /* The queue was destroyed, we are probably exiting */
1105 goto end;
1106
1107 /* check if another error occurred */
1108 CHECK_FCT_DO( ret, goto fatal_error );
1109 }
1110
1111 LOG_A("%s: Picked next message", action_name);
1112
1113 /* Now process the message */
1114 CHECK_FCT_DO( (*action_cb)(msg), goto fatal_error);
1115
1116 /* We're done with this message */
1117
1118 } while (1);
1119
1120fatal_error:
1121 TRACE_DEBUG(INFO, "An unrecoverable error occurred, %s thread is terminating...", action_name);
1122 CHECK_FCT_DO(fd_core_shutdown(), );
1123
1124end:
1125 ; /* noop so that we get rid of "label at end of compund statement" warning */
1126 /* Mark the thread as terminated */
1127 pthread_cleanup_pop(1);
1128 return NULL;
1129}
1130
1131/* The dispatch thread */
1132static void * dispatch_thr(void * arg)
1133{
1134 return process_thr(arg, msg_dispatch, fd_g_local, "Dispatch");
1135}
1136
1137/* The (routing-in) thread -- see description in freeDiameter.h */
1138static void * routing_in_thr(void * arg)
1139{
1140 return process_thr(arg, msg_rt_in, fd_g_incoming, "Routing-IN");
1141}
1142
1143/* The (routing-out) thread -- see description in freeDiameter.h */
1144static void * routing_out_thr(void * arg)
1145{
1146 return process_thr(arg, msg_rt_out, fd_g_outgoing, "Routing-OUT");
1147}
1148
1149
1150/********************************************************************************/
1151/* The functions for the other files */
1152/********************************************************************************/
1153
1154static pthread_t * dispatch = NULL;
1155static enum thread_state * disp_state = NULL;
1156
1157/* Later: make this more dynamic */
1158static pthread_t rt_out = (pthread_t)NULL;
1159static enum thread_state out_state = NOTRUNNING;
1160
1161static pthread_t rt_in = (pthread_t)NULL;
1162static enum thread_state in_state = NOTRUNNING;
1163
1164/* Initialize the routing and dispatch threads */
1165int fd_rtdisp_init(void)
1166{
1167 int i;
1168
1169 /* Prepare the array for dispatch */
1170 CHECK_MALLOC( disp_state = calloc(fd_g_config->cnf_dispthr, sizeof(enum thread_state)) );
1171 CHECK_MALLOC( dispatch = calloc(fd_g_config->cnf_dispthr, sizeof(pthread_t)) );
1172
1173 /* Create the threads */
1174 for (i=0; i < fd_g_config->cnf_dispthr; i++) {
1175 CHECK_POSIX( pthread_create( &dispatch[i], NULL, dispatch_thr, &disp_state[i] ) );
1176 }
1177 CHECK_POSIX( pthread_create( &rt_out, NULL, routing_out_thr, &out_state) );
1178 CHECK_POSIX( pthread_create( &rt_in, NULL, routing_in_thr, &in_state) );
1179
1180 /* Later: TODO("Set the thresholds for the queues to create more threads as needed"); */
1181
1182 /* Register the built-in callbacks */
1183 CHECK_FCT( fd_rt_out_register( dont_send_if_no_common_app, NULL, 10, NULL ) );
1184 CHECK_FCT( fd_rt_out_register( score_destination_avp, NULL, 10, NULL ) );
1185
1186 return 0;
1187}
1188
1189/* Ask the thread to terminate after next iteration */
1190int fd_rtdisp_cleanstop(void)
1191{
1192 CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1193 order_val = STOP;
1194 CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1195
1196 return 0;
1197}
1198
1199static void stop_thread_delayed(enum thread_state *st, pthread_t * thr, char * th_name)
1200{
1201 TRACE_ENTRY("%p %p", st, thr);
1202 CHECK_PARAMS_DO(st && thr, return);
1203 int terminated;
1204
1205 CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1206 terminated = (*st == NOTRUNNING);
1207 CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1208
1209
1210 /* Wait for a second for the thread to complete, by monitoring my_state */
1211 if (!terminated) {
1212 TRACE_DEBUG(INFO, "Waiting for the %s thread to have a chance to terminate", th_name);
1213 do {
1214 struct timespec ts, ts_final;
1215
1216 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), break );
1217
1218 ts_final.tv_sec = ts.tv_sec + 1;
1219 ts_final.tv_nsec = ts.tv_nsec;
1220
1221 while (TS_IS_INFERIOR( &ts, &ts_final )) {
1222
1223 CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1224 terminated = (*st == NOTRUNNING);
1225 CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1226 if (terminated)
1227 break;
1228
1229 usleep(100000);
1230 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), break );
1231 }
1232 } while (0);
1233 }
1234
1235 /* Now stop the thread and reclaim its resources */
1236 CHECK_FCT_DO( fd_thr_term(thr ), /* continue */);
1237
1238}
1239
1240/* Stop the thread after up to one second of wait */
1241int fd_rtdisp_fini(void)
1242{
1243 int i;
1244
1245 /* Destroy the incoming queue */
1246 CHECK_FCT_DO( fd_queues_fini(&fd_g_incoming), /* ignore */);
1247
1248 /* Stop the routing IN thread */
1249 stop_thread_delayed(&in_state, &rt_in, "IN routing");
1250
1251 /* Destroy the outgoing queue */
1252 CHECK_FCT_DO( fd_queues_fini(&fd_g_outgoing), /* ignore */);
1253
1254 /* Stop the routing OUT thread */
1255 stop_thread_delayed(&out_state, &rt_out, "OUT routing");
1256
1257 /* Destroy the local queue */
1258 CHECK_FCT_DO( fd_queues_fini(&fd_g_local), /* ignore */);
1259
1260 /* Stop the Dispatch threads */
1261 if (dispatch != NULL) {
1262 for (i=0; i < fd_g_config->cnf_dispthr; i++) {
1263 stop_thread_delayed(&disp_state[i], &dispatch[i], "Dispatching");
1264 }
1265 free(dispatch);
1266 dispatch = NULL;
1267 }
1268 if (disp_state != NULL) {
1269 free(disp_state);
1270 disp_state = NULL;
1271 }
1272
1273 return 0;
1274}
1275
1276/* Cleanup handlers */
1277int fd_rtdisp_cleanup(void)
1278{
1279 /* Cleanup all remaining handlers */
1280 while (!FD_IS_LIST_EMPTY(&rt_fwd_list)) {
1281 CHECK_FCT_DO( fd_rt_fwd_unregister ( (void *)rt_fwd_list.next, NULL ), /* continue */ );
1282 }
1283 while (!FD_IS_LIST_EMPTY(&rt_out_list)) {
1284 CHECK_FCT_DO( fd_rt_out_unregister ( (void *)rt_out_list.next, NULL ), /* continue */ );
1285 }
1286
1287 fd_disp_unregister_all(); /* destroy remaining handlers */
1288
1289 return 0;
1290}
1291
1292
1293/********************************************************************************/
1294/* For extensions to register a new appl */
1295/********************************************************************************/
1296
1297/* Add an application into the peer's supported apps */
1298int fd_disp_app_support ( struct dict_object * app, struct dict_object * vendor, int auth, int acct )
1299{
1300 application_id_t aid = 0;
1301 vendor_id_t vid = 0;
1302
1303 TRACE_ENTRY("%p %p %d %d", app, vendor, auth, acct);
1304 CHECK_PARAMS( app && (auth || acct) );
1305
1306 {
1307 enum dict_object_type type = 0;
1308 struct dict_application_data data;
1309 CHECK_FCT( fd_dict_gettype(app, &type) );
1310 CHECK_PARAMS( type == DICT_APPLICATION );
1311 CHECK_FCT( fd_dict_getval(app, &data) );
1312 aid = data.application_id;
1313 }
1314
1315 if (vendor) {
1316 enum dict_object_type type = 0;
1317 struct dict_vendor_data data;
1318 CHECK_FCT( fd_dict_gettype(vendor, &type) );
1319 CHECK_PARAMS( type == DICT_VENDOR );
1320 CHECK_FCT( fd_dict_getval(vendor, &data) );
1321 vid = data.vendor_id;
1322 }
1323
1324 return fd_app_merge(&fd_g_config->cnf_apps, aid, vid, auth, acct);
1325}
1326
1327
1328