blob: 896915c21c848d7b351f988f27a8015ea3f9a848 [file] [log] [blame]
Avneesh Sachdev5adc2522012-11-13 22:48:59 +00001/*
2 * Main implementation file for interface to Forwarding Plane Manager.
3 *
4 * Copyright (C) 2012 by Open Source Routing.
5 * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
6 *
7 * This file is part of GNU Zebra.
8 *
9 * GNU Zebra is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2, or (at your option) any
12 * later version.
13 *
14 * GNU Zebra is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License
20 * along with GNU Zebra; see the file COPYING. If not, write to the Free
21 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
22 * 02111-1307, USA.
23 */
24
25#include <zebra.h>
26
27#include "log.h"
28#include "stream.h"
29#include "thread.h"
30#include "network.h"
31#include "command.h"
32
33#include "zebra/rib.h"
34
35#include "fpm/fpm.h"
36#include "zebra_fpm.h"
37#include "zebra_fpm_private.h"
38
39/*
40 * Interval at which we attempt to connect to the FPM.
41 */
42#define ZFPM_CONNECT_RETRY_IVL 5
43
44/*
45 * Sizes of outgoing and incoming stream buffers for writing/reading
46 * FPM messages.
47 */
48#define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
49#define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
50
51/*
52 * The maximum number of times the FPM socket write callback can call
53 * 'write' before it yields.
54 */
55#define ZFPM_MAX_WRITES_PER_RUN 10
56
57/*
58 * Interval over which we collect statistics.
59 */
60#define ZFPM_STATS_IVL_SECS 10
61
62/*
63 * Structure that holds state for iterating over all route_node
64 * structures that are candidates for being communicated to the FPM.
65 */
66typedef struct zfpm_rnodes_iter_t_
67{
68 rib_tables_iter_t tables_iter;
69 route_table_iter_t iter;
70} zfpm_rnodes_iter_t;
71
72/*
73 * Statistics.
74 */
75typedef struct zfpm_stats_t_ {
76 unsigned long connect_calls;
77 unsigned long connect_no_sock;
78
79 unsigned long read_cb_calls;
80
81 unsigned long write_cb_calls;
82 unsigned long write_calls;
83 unsigned long partial_writes;
84 unsigned long max_writes_hit;
85 unsigned long t_write_yields;
86
87 unsigned long nop_deletes_skipped;
88 unsigned long route_adds;
89 unsigned long route_dels;
90
91 unsigned long updates_triggered;
92 unsigned long redundant_triggers;
93 unsigned long non_fpm_table_triggers;
94
95 unsigned long dests_del_after_update;
96
97 unsigned long t_conn_down_starts;
98 unsigned long t_conn_down_dests_processed;
99 unsigned long t_conn_down_yields;
100 unsigned long t_conn_down_finishes;
101
102 unsigned long t_conn_up_starts;
103 unsigned long t_conn_up_dests_processed;
104 unsigned long t_conn_up_yields;
105 unsigned long t_conn_up_aborts;
106 unsigned long t_conn_up_finishes;
107
108} zfpm_stats_t;
109
110/*
111 * States for the FPM state machine.
112 */
113typedef enum {
114
115 /*
116 * In this state we are not yet ready to connect to the FPM. This
117 * can happen when this module is disabled, or if we're cleaning up
118 * after a connection has gone down.
119 */
120 ZFPM_STATE_IDLE,
121
122 /*
123 * Ready to talk to the FPM and periodically trying to connect to
124 * it.
125 */
126 ZFPM_STATE_ACTIVE,
127
128 /*
129 * In the middle of bringing up a TCP connection. Specifically,
130 * waiting for a connect() call to complete asynchronously.
131 */
132 ZFPM_STATE_CONNECTING,
133
134 /*
135 * TCP connection to the FPM is up.
136 */
137 ZFPM_STATE_ESTABLISHED
138
139} zfpm_state_t;
140
141/*
142 * Globals.
143 */
144typedef struct zfpm_glob_t_
145{
146
147 /*
148 * True if the FPM module has been enabled.
149 */
150 int enabled;
151
152 struct thread_master *master;
153
154 zfpm_state_t state;
155
156 /*
157 * Port on which the FPM is running.
158 */
159 int fpm_port;
160
161 /*
162 * List of rib_dest_t structures to be processed
163 */
164 TAILQ_HEAD (zfpm_dest_q, rib_dest_t_) dest_q;
165
166 /*
167 * Stream socket to the FPM.
168 */
169 int sock;
170
171 /*
172 * Buffers for messages to/from the FPM.
173 */
174 struct stream *obuf;
175 struct stream *ibuf;
176
177 /*
178 * Threads for I/O.
179 */
180 struct thread *t_connect;
181 struct thread *t_write;
182 struct thread *t_read;
183
184 /*
185 * Thread to clean up after the TCP connection to the FPM goes down
186 * and the state that belongs to it.
187 */
188 struct thread *t_conn_down;
189
190 struct {
191 zfpm_rnodes_iter_t iter;
192 } t_conn_down_state;
193
194 /*
195 * Thread to take actions once the TCP conn to the FPM comes up, and
196 * the state that belongs to it.
197 */
198 struct thread *t_conn_up;
199
200 struct {
201 zfpm_rnodes_iter_t iter;
202 } t_conn_up_state;
203
204 unsigned long connect_calls;
205 time_t last_connect_call_time;
206
207 /*
208 * Stats from the start of the current statistics interval up to
209 * now. These are the counters we typically update in the code.
210 */
211 zfpm_stats_t stats;
212
213 /*
214 * Statistics that were gathered in the last collection interval.
215 */
216 zfpm_stats_t last_ivl_stats;
217
218 /*
219 * Cumulative stats from the last clear to the start of the current
220 * statistics interval.
221 */
222 zfpm_stats_t cumulative_stats;
223
224 /*
225 * Stats interval timer.
226 */
227 struct thread *t_stats;
228
229 /*
230 * If non-zero, the last time when statistics were cleared.
231 */
232 time_t last_stats_clear_time;
233
234} zfpm_glob_t;
235
236static zfpm_glob_t zfpm_glob_space;
237static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
238
239static int zfpm_read_cb (struct thread *thread);
240static int zfpm_write_cb (struct thread *thread);
241
242static void zfpm_set_state (zfpm_state_t state, const char *reason);
243static void zfpm_start_connect_timer (const char *reason);
244static void zfpm_start_stats_timer (void);
245
246/*
247 * zfpm_thread_should_yield
248 */
249static inline int
250zfpm_thread_should_yield (struct thread *t)
251{
252 return thread_should_yield (t);
253}
254
255/*
256 * zfpm_state_to_str
257 */
258static const char *
259zfpm_state_to_str (zfpm_state_t state)
260{
261 switch (state)
262 {
263
264 case ZFPM_STATE_IDLE:
265 return "idle";
266
267 case ZFPM_STATE_ACTIVE:
268 return "active";
269
270 case ZFPM_STATE_CONNECTING:
271 return "connecting";
272
273 case ZFPM_STATE_ESTABLISHED:
274 return "established";
275
276 default:
277 return "unknown";
278 }
279}
280
281/*
282 * zfpm_get_time
283 */
284static time_t
285zfpm_get_time (void)
286{
287 struct timeval tv;
288
289 if (quagga_gettime (QUAGGA_CLK_MONOTONIC, &tv) < 0)
290 zlog_warn ("FPM: quagga_gettime failed!!");
291
292 return tv.tv_sec;
293}
294
295/*
296 * zfpm_get_elapsed_time
297 *
298 * Returns the time elapsed (in seconds) since the given time.
299 */
300static time_t
301zfpm_get_elapsed_time (time_t reference)
302{
303 time_t now;
304
305 now = zfpm_get_time ();
306
307 if (now < reference)
308 {
309 assert (0);
310 return 0;
311 }
312
313 return now - reference;
314}
315
316/*
317 * zfpm_is_table_for_fpm
318 *
319 * Returns TRUE if the the given table is to be communicated to the
320 * FPM.
321 */
322static inline int
323zfpm_is_table_for_fpm (struct route_table *table)
324{
325 rib_table_info_t *info;
326
327 info = rib_table_info (table);
328
329 /*
330 * We only send the unicast tables in the main instance to the FPM
331 * at this point.
332 */
333 if (info->vrf->id != 0)
334 return 0;
335
336 if (info->safi != SAFI_UNICAST)
337 return 0;
338
339 return 1;
340}
341
342/*
343 * zfpm_rnodes_iter_init
344 */
345static inline void
346zfpm_rnodes_iter_init (zfpm_rnodes_iter_t *iter)
347{
348 memset (iter, 0, sizeof (*iter));
349 rib_tables_iter_init (&iter->tables_iter);
350
351 /*
352 * This is a hack, but it makes implementing 'next' easier by
353 * ensuring that route_table_iter_next() will return NULL the first
354 * time we call it.
355 */
356 route_table_iter_init (&iter->iter, NULL);
357 route_table_iter_cleanup (&iter->iter);
358}
359
360/*
361 * zfpm_rnodes_iter_next
362 */
363static inline struct route_node *
364zfpm_rnodes_iter_next (zfpm_rnodes_iter_t *iter)
365{
366 struct route_node *rn;
367 struct route_table *table;
368
369 while (1)
370 {
371 rn = route_table_iter_next (&iter->iter);
372 if (rn)
373 return rn;
374
375 /*
376 * We've made our way through this table, go to the next one.
377 */
378 route_table_iter_cleanup (&iter->iter);
379
380 while ((table = rib_tables_iter_next (&iter->tables_iter)))
381 {
382 if (zfpm_is_table_for_fpm (table))
383 break;
384 }
385
386 if (!table)
387 return NULL;
388
389 route_table_iter_init (&iter->iter, table);
390 }
391
392 return NULL;
393}
394
395/*
396 * zfpm_rnodes_iter_pause
397 */
398static inline void
399zfpm_rnodes_iter_pause (zfpm_rnodes_iter_t *iter)
400{
401 route_table_iter_pause (&iter->iter);
402}
403
404/*
405 * zfpm_rnodes_iter_cleanup
406 */
407static inline void
408zfpm_rnodes_iter_cleanup (zfpm_rnodes_iter_t *iter)
409{
410 route_table_iter_cleanup (&iter->iter);
411 rib_tables_iter_cleanup (&iter->tables_iter);
412}
413
414/*
415 * zfpm_stats_init
416 *
417 * Initialize a statistics block.
418 */
419static inline void
420zfpm_stats_init (zfpm_stats_t *stats)
421{
422 memset (stats, 0, sizeof (*stats));
423}
424
425/*
426 * zfpm_stats_reset
427 */
428static inline void
429zfpm_stats_reset (zfpm_stats_t *stats)
430{
431 zfpm_stats_init (stats);
432}
433
434/*
435 * zfpm_stats_copy
436 */
437static inline void
438zfpm_stats_copy (const zfpm_stats_t *src, zfpm_stats_t *dest)
439{
440 memcpy (dest, src, sizeof (*dest));
441}
442
443/*
444 * zfpm_stats_compose
445 *
446 * Total up the statistics in two stats structures ('s1 and 's2') and
447 * return the result in the third argument, 'result'. Note that the
448 * pointer 'result' may be the same as 's1' or 's2'.
449 *
450 * For simplicity, the implementation below assumes that the stats
451 * structure is composed entirely of counters. This can easily be
452 * changed when necessary.
453 */
454static void
455zfpm_stats_compose (const zfpm_stats_t *s1, const zfpm_stats_t *s2,
456 zfpm_stats_t *result)
457{
458 const unsigned long *p1, *p2;
459 unsigned long *result_p;
460 int i, num_counters;
461
462 p1 = (const unsigned long *) s1;
463 p2 = (const unsigned long *) s2;
464 result_p = (unsigned long *) result;
465
466 num_counters = (sizeof (zfpm_stats_t) / sizeof (unsigned long));
467
468 for (i = 0; i < num_counters; i++)
469 {
470 result_p[i] = p1[i] + p2[i];
471 }
472}
473
474/*
475 * zfpm_read_on
476 */
477static inline void
478zfpm_read_on (void)
479{
480 assert (!zfpm_g->t_read);
481 assert (zfpm_g->sock >= 0);
482
483 THREAD_READ_ON (zfpm_g->master, zfpm_g->t_read, zfpm_read_cb, 0,
484 zfpm_g->sock);
485}
486
487/*
488 * zfpm_write_on
489 */
490static inline void
491zfpm_write_on (void)
492{
493 assert (!zfpm_g->t_write);
494 assert (zfpm_g->sock >= 0);
495
496 THREAD_WRITE_ON (zfpm_g->master, zfpm_g->t_write, zfpm_write_cb, 0,
497 zfpm_g->sock);
498}
499
500/*
501 * zfpm_read_off
502 */
503static inline void
504zfpm_read_off (void)
505{
506 THREAD_READ_OFF (zfpm_g->t_read);
507}
508
509/*
510 * zfpm_write_off
511 */
512static inline void
513zfpm_write_off (void)
514{
515 THREAD_WRITE_OFF (zfpm_g->t_write);
516}
517
518/*
519 * zfpm_conn_up_thread_cb
520 *
521 * Callback for actions to be taken when the connection to the FPM
522 * comes up.
523 */
524static int
525zfpm_conn_up_thread_cb (struct thread *thread)
526{
527 struct route_node *rnode;
528 zfpm_rnodes_iter_t *iter;
529 rib_dest_t *dest;
530
531 assert (zfpm_g->t_conn_up);
532 zfpm_g->t_conn_up = NULL;
533
534 iter = &zfpm_g->t_conn_up_state.iter;
535
536 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
537 {
538 zfpm_debug ("Connection not up anymore, conn_up thread aborting");
539 zfpm_g->stats.t_conn_up_aborts++;
540 goto done;
541 }
542
543 while ((rnode = zfpm_rnodes_iter_next (iter)))
544 {
545 dest = rib_dest_from_rnode (rnode);
546
547 if (dest)
548 {
549 zfpm_g->stats.t_conn_up_dests_processed++;
550 zfpm_trigger_update (rnode, NULL);
551 }
552
553 /*
554 * Yield if need be.
555 */
556 if (!zfpm_thread_should_yield (thread))
557 continue;
558
559 zfpm_g->stats.t_conn_up_yields++;
560 zfpm_rnodes_iter_pause (iter);
561 zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
562 zfpm_conn_up_thread_cb,
563 0, 0);
564 return 0;
565 }
566
567 zfpm_g->stats.t_conn_up_finishes++;
568
569 done:
570 zfpm_rnodes_iter_cleanup (iter);
571 return 0;
572}
573
574/*
575 * zfpm_connection_up
576 *
577 * Called when the connection to the FPM comes up.
578 */
579static void
580zfpm_connection_up (const char *detail)
581{
582 assert (zfpm_g->sock >= 0);
583 zfpm_read_on ();
584 zfpm_write_on ();
585 zfpm_set_state (ZFPM_STATE_ESTABLISHED, detail);
586
587 /*
588 * Start thread to push existing routes to the FPM.
589 */
590 assert (!zfpm_g->t_conn_up);
591
592 zfpm_rnodes_iter_init (&zfpm_g->t_conn_up_state.iter);
593
594 zfpm_debug ("Starting conn_up thread");
595 zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
596 zfpm_conn_up_thread_cb, 0, 0);
597 zfpm_g->stats.t_conn_up_starts++;
598}
599
600/*
601 * zfpm_connect_check
602 *
603 * Check if an asynchronous connect() to the FPM is complete.
604 */
605static void
606zfpm_connect_check ()
607{
608 int status;
609 socklen_t slen;
610 int ret;
611
612 zfpm_read_off ();
613 zfpm_write_off ();
614
615 slen = sizeof (status);
616 ret = getsockopt (zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *) &status,
617 &slen);
618
619 if (ret >= 0 && status == 0)
620 {
621 zfpm_connection_up ("async connect complete");
622 return;
623 }
624
625 /*
626 * getsockopt() failed or indicated an error on the socket.
627 */
628 close (zfpm_g->sock);
629 zfpm_g->sock = -1;
630
631 zfpm_start_connect_timer ("getsockopt() after async connect failed");
632 return;
633}
634
635/*
636 * zfpm_conn_down_thread_cb
637 *
638 * Callback that is invoked to clean up state after the TCP connection
639 * to the FPM goes down.
640 */
641static int
642zfpm_conn_down_thread_cb (struct thread *thread)
643{
644 struct route_node *rnode;
645 zfpm_rnodes_iter_t *iter;
646 rib_dest_t *dest;
647
648 assert (zfpm_g->state == ZFPM_STATE_IDLE);
649
650 assert (zfpm_g->t_conn_down);
651 zfpm_g->t_conn_down = NULL;
652
653 iter = &zfpm_g->t_conn_down_state.iter;
654
655 while ((rnode = zfpm_rnodes_iter_next (iter)))
656 {
657 dest = rib_dest_from_rnode (rnode);
658
659 if (dest)
660 {
661 if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM))
662 {
663 TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
664 }
665
666 UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
667 UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
668
669 zfpm_g->stats.t_conn_down_dests_processed++;
670
671 /*
672 * Check if the dest should be deleted.
673 */
674 rib_gc_dest(rnode);
675 }
676
677 /*
678 * Yield if need be.
679 */
680 if (!zfpm_thread_should_yield (thread))
681 continue;
682
683 zfpm_g->stats.t_conn_down_yields++;
684 zfpm_rnodes_iter_pause (iter);
685 zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
686 zfpm_conn_down_thread_cb,
687 0, 0);
688 return 0;
689 }
690
691 zfpm_g->stats.t_conn_down_finishes++;
692 zfpm_rnodes_iter_cleanup (iter);
693
694 /*
695 * Start the process of connecting to the FPM again.
696 */
697 zfpm_start_connect_timer ("cleanup complete");
698 return 0;
699}
700
701/*
702 * zfpm_connection_down
703 *
704 * Called when the connection to the FPM has gone down.
705 */
706static void
707zfpm_connection_down (const char *detail)
708{
709 if (!detail)
710 detail = "unknown";
711
712 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
713
714 zlog_info ("connection to the FPM has gone down: %s", detail);
715
716 zfpm_read_off ();
717 zfpm_write_off ();
718
719 stream_reset (zfpm_g->ibuf);
720 stream_reset (zfpm_g->obuf);
721
722 if (zfpm_g->sock >= 0) {
723 close (zfpm_g->sock);
724 zfpm_g->sock = -1;
725 }
726
727 /*
728 * Start thread to clean up state after the connection goes down.
729 */
730 assert (!zfpm_g->t_conn_down);
731 zfpm_debug ("Starting conn_down thread");
732 zfpm_rnodes_iter_init (&zfpm_g->t_conn_down_state.iter);
733 zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
734 zfpm_conn_down_thread_cb, 0, 0);
735 zfpm_g->stats.t_conn_down_starts++;
736
737 zfpm_set_state (ZFPM_STATE_IDLE, detail);
738}
739
740/*
741 * zfpm_read_cb
742 */
743static int
744zfpm_read_cb (struct thread *thread)
745{
746 size_t already;
747 struct stream *ibuf;
748 uint16_t msg_len;
749 fpm_msg_hdr_t *hdr;
750
751 zfpm_g->stats.read_cb_calls++;
752 assert (zfpm_g->t_read);
753 zfpm_g->t_read = NULL;
754
755 /*
756 * Check if async connect is now done.
757 */
758 if (zfpm_g->state == ZFPM_STATE_CONNECTING)
759 {
760 zfpm_connect_check();
761 return 0;
762 }
763
764 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
765 assert (zfpm_g->sock >= 0);
766
767 ibuf = zfpm_g->ibuf;
768
769 already = stream_get_endp (ibuf);
770 if (already < FPM_MSG_HDR_LEN)
771 {
772 ssize_t nbyte;
773
774 nbyte = stream_read_try (ibuf, zfpm_g->sock, FPM_MSG_HDR_LEN - already);
775 if (nbyte == 0 || nbyte == -1)
776 {
777 zfpm_connection_down ("closed socket in read");
778 return 0;
779 }
780
781 if (nbyte != (ssize_t) (FPM_MSG_HDR_LEN - already))
782 goto done;
783
784 already = FPM_MSG_HDR_LEN;
785 }
786
787 stream_set_getp (ibuf, 0);
788
789 hdr = (fpm_msg_hdr_t *) stream_pnt (ibuf);
790
791 if (!fpm_msg_hdr_ok (hdr))
792 {
793 zfpm_connection_down ("invalid message header");
794 return 0;
795 }
796
797 msg_len = fpm_msg_len (hdr);
798
799 /*
800 * Read out the rest of the packet.
801 */
802 if (already < msg_len)
803 {
804 ssize_t nbyte;
805
806 nbyte = stream_read_try (ibuf, zfpm_g->sock, msg_len - already);
807
808 if (nbyte == 0 || nbyte == -1)
809 {
810 zfpm_connection_down ("failed to read message");
811 return 0;
812 }
813
814 if (nbyte != (ssize_t) (msg_len - already))
815 goto done;
816 }
817
818 zfpm_debug ("Read out a full fpm message");
819
820 /*
821 * Just throw it away for now.
822 */
823 stream_reset (ibuf);
824
825 done:
826 zfpm_read_on ();
827 return 0;
828}
829
830/*
831 * zfpm_writes_pending
832 *
833 * Returns TRUE if we may have something to write to the FPM.
834 */
835static int
836zfpm_writes_pending (void)
837{
838
839 /*
840 * Check if there is any data in the outbound buffer that has not
841 * been written to the socket yet.
842 */
843 if (stream_get_endp (zfpm_g->obuf) - stream_get_getp (zfpm_g->obuf))
844 return 1;
845
846 /*
847 * Check if there are any prefixes on the outbound queue.
848 */
849 if (!TAILQ_EMPTY (&zfpm_g->dest_q))
850 return 1;
851
852 return 0;
853}
854
855/*
856 * zfpm_encode_route
857 *
858 * Encode a message to the FPM with information about the given route.
859 *
860 * Returns the number of bytes written to the buffer. 0 or a negative
861 * value indicates an error.
862 */
863static inline int
864zfpm_encode_route (rib_dest_t *dest, struct rib *rib, char *in_buf,
865 size_t in_buf_len)
866{
867#ifndef HAVE_NETLINK
868 return 0;
869#else
870
871 int cmd;
872
873 cmd = rib ? RTM_NEWROUTE : RTM_DELROUTE;
874
875 return zfpm_netlink_encode_route (cmd, dest, rib, in_buf, in_buf_len);
876
877#endif /* HAVE_NETLINK */
878}
879
880/*
881 * zfpm_route_for_update
882 *
883 * Returns the rib that is to be sent to the FPM for a given dest.
884 */
885static struct rib *
886zfpm_route_for_update (rib_dest_t *dest)
887{
888 struct rib *rib;
889
890 RIB_DEST_FOREACH_ROUTE (dest, rib)
891 {
892 if (!CHECK_FLAG (rib->flags, ZEBRA_FLAG_SELECTED))
893 continue;
894
895 return rib;
896 }
897
898 /*
899 * We have no route for this destination.
900 */
901 return NULL;
902}
903
904/*
905 * zfpm_build_updates
906 *
907 * Process the outgoing queue and write messages to the outbound
908 * buffer.
909 */
910static void
911zfpm_build_updates (void)
912{
913 struct stream *s;
914 rib_dest_t *dest;
915 unsigned char *buf, *data, *buf_end;
916 size_t msg_len;
917 size_t data_len;
918 fpm_msg_hdr_t *hdr;
919 struct rib *rib;
920 int is_add, write_msg;
921
922 s = zfpm_g->obuf;
923
924 assert (stream_empty (s));
925
926 do {
927
928 /*
929 * Make sure there is enough space to write another message.
930 */
931 if (STREAM_WRITEABLE (s) < FPM_MAX_MSG_LEN)
932 break;
933
934 buf = STREAM_DATA (s) + stream_get_endp (s);
935 buf_end = buf + STREAM_WRITEABLE (s);
936
937 dest = TAILQ_FIRST (&zfpm_g->dest_q);
938 if (!dest)
939 break;
940
941 assert (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM));
942
943 hdr = (fpm_msg_hdr_t *) buf;
944 hdr->version = FPM_PROTO_VERSION;
945 hdr->msg_type = FPM_MSG_TYPE_NETLINK;
946
947 data = fpm_msg_data (hdr);
948
949 rib = zfpm_route_for_update (dest);
950 is_add = rib ? 1 : 0;
951
952 write_msg = 1;
953
954 /*
955 * If this is a route deletion, and we have not sent the route to
956 * the FPM previously, skip it.
957 */
958 if (!is_add && !CHECK_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM))
959 {
960 write_msg = 0;
961 zfpm_g->stats.nop_deletes_skipped++;
962 }
963
964 if (write_msg) {
965 data_len = zfpm_encode_route (dest, rib, (char *) data, buf_end - data);
966
967 assert (data_len);
968 if (data_len)
969 {
970 msg_len = fpm_data_len_to_msg_len (data_len);
971 hdr->msg_len = htons (msg_len);
972 stream_forward_endp (s, msg_len);
973
974 if (is_add)
975 zfpm_g->stats.route_adds++;
976 else
977 zfpm_g->stats.route_dels++;
978 }
979 }
980
981 /*
982 * Remove the dest from the queue, and reset the flag.
983 */
984 UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
985 TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
986
987 if (is_add)
988 {
989 SET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
990 }
991 else
992 {
993 UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
994 }
995
996 /*
997 * Delete the destination if necessary.
998 */
999 if (rib_gc_dest (dest->rnode))
1000 zfpm_g->stats.dests_del_after_update++;
1001
1002 } while (1);
1003
1004}
1005
1006/*
1007 * zfpm_write_cb
1008 */
1009static int
1010zfpm_write_cb (struct thread *thread)
1011{
1012 struct stream *s;
1013 int num_writes;
1014
1015 zfpm_g->stats.write_cb_calls++;
1016 assert (zfpm_g->t_write);
1017 zfpm_g->t_write = NULL;
1018
1019 /*
1020 * Check if async connect is now done.
1021 */
1022 if (zfpm_g->state == ZFPM_STATE_CONNECTING)
1023 {
1024 zfpm_connect_check ();
1025 return 0;
1026 }
1027
1028 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
1029 assert (zfpm_g->sock >= 0);
1030
1031 num_writes = 0;
1032
1033 do
1034 {
1035 int bytes_to_write, bytes_written;
1036
1037 s = zfpm_g->obuf;
1038
1039 /*
1040 * If the stream is empty, try fill it up with data.
1041 */
1042 if (stream_empty (s))
1043 {
1044 zfpm_build_updates ();
1045 }
1046
1047 bytes_to_write = stream_get_endp (s) - stream_get_getp (s);
1048 if (!bytes_to_write)
1049 break;
1050
1051 bytes_written = write (zfpm_g->sock, STREAM_PNT (s), bytes_to_write);
1052 zfpm_g->stats.write_calls++;
1053 num_writes++;
1054
1055 if (bytes_written < 0)
1056 {
1057 if (ERRNO_IO_RETRY (errno))
1058 break;
1059
1060 zfpm_connection_down ("failed to write to socket");
1061 return 0;
1062 }
1063
1064 if (bytes_written != bytes_to_write)
1065 {
1066
1067 /*
1068 * Partial write.
1069 */
1070 stream_forward_getp (s, bytes_written);
1071 zfpm_g->stats.partial_writes++;
1072 break;
1073 }
1074
1075 /*
1076 * We've written out the entire contents of the stream.
1077 */
1078 stream_reset (s);
1079
1080 if (num_writes >= ZFPM_MAX_WRITES_PER_RUN)
1081 {
1082 zfpm_g->stats.max_writes_hit++;
1083 break;
1084 }
1085
1086 if (zfpm_thread_should_yield (thread))
1087 {
1088 zfpm_g->stats.t_write_yields++;
1089 break;
1090 }
1091 } while (1);
1092
1093 if (zfpm_writes_pending ())
1094 zfpm_write_on ();
1095
1096 return 0;
1097}
1098
1099/*
1100 * zfpm_connect_cb
1101 */
1102static int
1103zfpm_connect_cb (struct thread *t)
1104{
1105 int sock, ret;
1106 struct sockaddr_in serv;
1107
1108 assert (zfpm_g->t_connect);
1109 zfpm_g->t_connect = NULL;
1110 assert (zfpm_g->state == ZFPM_STATE_ACTIVE);
1111
1112 sock = socket (AF_INET, SOCK_STREAM, 0);
1113 if (sock < 0)
1114 {
1115 zfpm_debug ("Failed to create socket for connect(): %s", strerror(errno));
1116 zfpm_g->stats.connect_no_sock++;
1117 return 0;
1118 }
1119
1120 set_nonblocking(sock);
1121
1122 /* Make server socket. */
1123 memset (&serv, 0, sizeof (serv));
1124 serv.sin_family = AF_INET;
1125 serv.sin_port = htons (zfpm_g->fpm_port);
1126#ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
1127 serv.sin_len = sizeof (struct sockaddr_in);
1128#endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
1129 serv.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
1130
1131 /*
1132 * Connect to the FPM.
1133 */
1134 zfpm_g->connect_calls++;
1135 zfpm_g->stats.connect_calls++;
1136 zfpm_g->last_connect_call_time = zfpm_get_time ();
1137
1138 ret = connect (sock, (struct sockaddr *) &serv, sizeof (serv));
1139 if (ret >= 0)
1140 {
1141 zfpm_g->sock = sock;
1142 zfpm_connection_up ("connect succeeded");
1143 return 1;
1144 }
1145
1146 if (errno == EINPROGRESS)
1147 {
1148 zfpm_g->sock = sock;
1149 zfpm_read_on ();
1150 zfpm_write_on ();
1151 zfpm_set_state (ZFPM_STATE_CONNECTING, "async connect in progress");
1152 return 0;
1153 }
1154
1155 zlog_info ("can't connect to FPM %d: %s", sock, safe_strerror (errno));
1156 close (sock);
1157
1158 /*
1159 * Restart timer for retrying connection.
1160 */
1161 zfpm_start_connect_timer ("connect() failed");
1162 return 0;
1163}
1164
1165/*
1166 * zfpm_set_state
1167 *
1168 * Move state machine into the given state.
1169 */
1170static void
1171zfpm_set_state (zfpm_state_t state, const char *reason)
1172{
1173 zfpm_state_t cur_state = zfpm_g->state;
1174
1175 if (!reason)
1176 reason = "Unknown";
1177
1178 if (state == cur_state)
1179 return;
1180
1181 zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1182 zfpm_state_to_str (cur_state), zfpm_state_to_str (state),
1183 reason);
1184
1185 switch (state) {
1186
1187 case ZFPM_STATE_IDLE:
1188 assert (cur_state == ZFPM_STATE_ESTABLISHED);
1189 break;
1190
1191 case ZFPM_STATE_ACTIVE:
1192 assert (cur_state == ZFPM_STATE_IDLE ||
1193 cur_state == ZFPM_STATE_CONNECTING);
1194 assert (zfpm_g->t_connect);
1195 break;
1196
1197 case ZFPM_STATE_CONNECTING:
1198 assert (zfpm_g->sock);
1199 assert (cur_state == ZFPM_STATE_ACTIVE);
1200 assert (zfpm_g->t_read);
1201 assert (zfpm_g->t_write);
1202 break;
1203
1204 case ZFPM_STATE_ESTABLISHED:
1205 assert (cur_state == ZFPM_STATE_ACTIVE ||
1206 cur_state == ZFPM_STATE_CONNECTING);
1207 assert (zfpm_g->sock);
1208 assert (zfpm_g->t_read);
1209 assert (zfpm_g->t_write);
1210 break;
1211 }
1212
1213 zfpm_g->state = state;
1214}
1215
1216/*
1217 * zfpm_calc_connect_delay
1218 *
1219 * Returns the number of seconds after which we should attempt to
1220 * reconnect to the FPM.
1221 */
1222static long
1223zfpm_calc_connect_delay (void)
1224{
1225 time_t elapsed;
1226
1227 /*
1228 * Return 0 if this is our first attempt to connect.
1229 */
1230 if (zfpm_g->connect_calls == 0)
1231 {
1232 return 0;
1233 }
1234
1235 elapsed = zfpm_get_elapsed_time (zfpm_g->last_connect_call_time);
1236
1237 if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1238 return 0;
1239 }
1240
1241 return ZFPM_CONNECT_RETRY_IVL - elapsed;
1242}
1243
1244/*
1245 * zfpm_start_connect_timer
1246 */
1247static void
1248zfpm_start_connect_timer (const char *reason)
1249{
1250 long delay_secs;
1251
1252 assert (!zfpm_g->t_connect);
1253 assert (zfpm_g->sock < 0);
1254
1255 assert(zfpm_g->state == ZFPM_STATE_IDLE ||
1256 zfpm_g->state == ZFPM_STATE_ACTIVE ||
1257 zfpm_g->state == ZFPM_STATE_CONNECTING);
1258
1259 delay_secs = zfpm_calc_connect_delay();
1260 zfpm_debug ("scheduling connect in %ld seconds", delay_secs);
1261
1262 THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_connect, zfpm_connect_cb, 0,
1263 delay_secs);
1264 zfpm_set_state (ZFPM_STATE_ACTIVE, reason);
1265}
1266
1267/*
1268 * zfpm_is_enabled
1269 *
1270 * Returns TRUE if the zebra FPM module has been enabled.
1271 */
1272static inline int
1273zfpm_is_enabled (void)
1274{
1275 return zfpm_g->enabled;
1276}
1277
1278/*
1279 * zfpm_conn_is_up
1280 *
1281 * Returns TRUE if the connection to the FPM is up.
1282 */
1283static inline int
1284zfpm_conn_is_up (void)
1285{
1286 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1287 return 0;
1288
1289 assert (zfpm_g->sock >= 0);
1290
1291 return 1;
1292}
1293
1294/*
1295 * zfpm_trigger_update
1296 *
1297 * The zebra code invokes this function to indicate that we should
1298 * send an update to the FPM about the given route_node.
1299 */
1300void
1301zfpm_trigger_update (struct route_node *rn, const char *reason)
1302{
1303 rib_dest_t *dest;
Timo Teräsbe6335d2015-05-23 11:08:41 +03001304 char buf[PREFIX_STRLEN];
Avneesh Sachdev5adc2522012-11-13 22:48:59 +00001305
1306 /*
1307 * Ignore if the connection is down. We will update the FPM about
1308 * all destinations once the connection comes up.
1309 */
1310 if (!zfpm_conn_is_up ())
1311 return;
1312
1313 dest = rib_dest_from_rnode (rn);
1314
1315 /*
1316 * Ignore the trigger if the dest is not in a table that we would
1317 * send to the FPM.
1318 */
1319 if (!zfpm_is_table_for_fpm (rib_dest_table (dest)))
1320 {
1321 zfpm_g->stats.non_fpm_table_triggers++;
1322 return;
1323 }
1324
1325 if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM)) {
1326 zfpm_g->stats.redundant_triggers++;
1327 return;
1328 }
1329
1330 if (reason)
1331 {
Timo Teräsbe6335d2015-05-23 11:08:41 +03001332 zfpm_debug ("%s triggering update to FPM - Reason: %s",
1333 prefix2str (&rn->p, buf, sizeof(buf)), reason);
Avneesh Sachdev5adc2522012-11-13 22:48:59 +00001334 }
1335
1336 SET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
1337 TAILQ_INSERT_TAIL (&zfpm_g->dest_q, dest, fpm_q_entries);
1338 zfpm_g->stats.updates_triggered++;
1339
1340 /*
1341 * Make sure that writes are enabled.
1342 */
1343 if (zfpm_g->t_write)
1344 return;
1345
1346 zfpm_write_on ();
1347}
1348
1349/*
1350 * zfpm_stats_timer_cb
1351 */
1352static int
1353zfpm_stats_timer_cb (struct thread *t)
1354{
1355 assert (zfpm_g->t_stats);
1356 zfpm_g->t_stats = NULL;
1357
1358 /*
1359 * Remember the stats collected in the last interval for display
1360 * purposes.
1361 */
1362 zfpm_stats_copy (&zfpm_g->stats, &zfpm_g->last_ivl_stats);
1363
1364 /*
1365 * Add the current set of stats into the cumulative statistics.
1366 */
1367 zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1368 &zfpm_g->cumulative_stats);
1369
1370 /*
1371 * Start collecting stats afresh over the next interval.
1372 */
1373 zfpm_stats_reset (&zfpm_g->stats);
1374
1375 zfpm_start_stats_timer ();
1376
1377 return 0;
1378}
1379
1380/*
1381 * zfpm_stop_stats_timer
1382 */
1383static void
1384zfpm_stop_stats_timer (void)
1385{
1386 if (!zfpm_g->t_stats)
1387 return;
1388
1389 zfpm_debug ("Stopping existing stats timer");
1390 THREAD_TIMER_OFF (zfpm_g->t_stats);
1391}
1392
1393/*
1394 * zfpm_start_stats_timer
1395 */
1396void
1397zfpm_start_stats_timer (void)
1398{
1399 assert (!zfpm_g->t_stats);
1400
1401 THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_stats, zfpm_stats_timer_cb, 0,
1402 ZFPM_STATS_IVL_SECS);
1403}
1404
1405/*
1406 * Helper macro for zfpm_show_stats() below.
1407 */
1408#define ZFPM_SHOW_STAT(counter) \
1409 do { \
1410 vty_out (vty, "%-40s %10lu %16lu%s", #counter, total_stats.counter, \
1411 zfpm_g->last_ivl_stats.counter, VTY_NEWLINE); \
1412 } while (0)
1413
1414/*
1415 * zfpm_show_stats
1416 */
1417static void
1418zfpm_show_stats (struct vty *vty)
1419{
1420 zfpm_stats_t total_stats;
1421 time_t elapsed;
1422
1423 vty_out (vty, "%s%-40s %10s Last %2d secs%s%s", VTY_NEWLINE, "Counter",
1424 "Total", ZFPM_STATS_IVL_SECS, VTY_NEWLINE, VTY_NEWLINE);
1425
1426 /*
1427 * Compute the total stats up to this instant.
1428 */
1429 zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1430 &total_stats);
1431
1432 ZFPM_SHOW_STAT (connect_calls);
1433 ZFPM_SHOW_STAT (connect_no_sock);
1434 ZFPM_SHOW_STAT (read_cb_calls);
1435 ZFPM_SHOW_STAT (write_cb_calls);
1436 ZFPM_SHOW_STAT (write_calls);
1437 ZFPM_SHOW_STAT (partial_writes);
1438 ZFPM_SHOW_STAT (max_writes_hit);
1439 ZFPM_SHOW_STAT (t_write_yields);
1440 ZFPM_SHOW_STAT (nop_deletes_skipped);
1441 ZFPM_SHOW_STAT (route_adds);
1442 ZFPM_SHOW_STAT (route_dels);
1443 ZFPM_SHOW_STAT (updates_triggered);
1444 ZFPM_SHOW_STAT (non_fpm_table_triggers);
1445 ZFPM_SHOW_STAT (redundant_triggers);
1446 ZFPM_SHOW_STAT (dests_del_after_update);
1447 ZFPM_SHOW_STAT (t_conn_down_starts);
1448 ZFPM_SHOW_STAT (t_conn_down_dests_processed);
1449 ZFPM_SHOW_STAT (t_conn_down_yields);
1450 ZFPM_SHOW_STAT (t_conn_down_finishes);
1451 ZFPM_SHOW_STAT (t_conn_up_starts);
1452 ZFPM_SHOW_STAT (t_conn_up_dests_processed);
1453 ZFPM_SHOW_STAT (t_conn_up_yields);
1454 ZFPM_SHOW_STAT (t_conn_up_aborts);
1455 ZFPM_SHOW_STAT (t_conn_up_finishes);
1456
1457 if (!zfpm_g->last_stats_clear_time)
1458 return;
1459
1460 elapsed = zfpm_get_elapsed_time (zfpm_g->last_stats_clear_time);
1461
1462 vty_out (vty, "%sStats were cleared %lu seconds ago%s", VTY_NEWLINE,
1463 (unsigned long) elapsed, VTY_NEWLINE);
1464}
1465
1466/*
1467 * zfpm_clear_stats
1468 */
1469static void
1470zfpm_clear_stats (struct vty *vty)
1471{
1472 if (!zfpm_is_enabled ())
1473 {
1474 vty_out (vty, "The FPM module is not enabled...%s", VTY_NEWLINE);
1475 return;
1476 }
1477
1478 zfpm_stats_reset (&zfpm_g->stats);
1479 zfpm_stats_reset (&zfpm_g->last_ivl_stats);
1480 zfpm_stats_reset (&zfpm_g->cumulative_stats);
1481
1482 zfpm_stop_stats_timer ();
1483 zfpm_start_stats_timer ();
1484
1485 zfpm_g->last_stats_clear_time = zfpm_get_time();
1486
1487 vty_out (vty, "Cleared FPM stats%s", VTY_NEWLINE);
1488}
1489
1490/*
1491 * show_zebra_fpm_stats
1492 */
1493DEFUN (show_zebra_fpm_stats,
1494 show_zebra_fpm_stats_cmd,
1495 "show zebra fpm stats",
1496 SHOW_STR
1497 "Zebra information\n"
1498 "Forwarding Path Manager information\n"
1499 "Statistics\n")
1500{
1501 zfpm_show_stats (vty);
1502 return CMD_SUCCESS;
1503}
1504
1505/*
1506 * clear_zebra_fpm_stats
1507 */
1508DEFUN (clear_zebra_fpm_stats,
1509 clear_zebra_fpm_stats_cmd,
1510 "clear zebra fpm stats",
1511 CLEAR_STR
1512 "Zebra information\n"
1513 "Clear Forwarding Path Manager information\n"
1514 "Statistics\n")
1515{
1516 zfpm_clear_stats (vty);
1517 return CMD_SUCCESS;
1518}
1519
1520/**
1521 * zfpm_init
1522 *
1523 * One-time initialization of the Zebra FPM module.
1524 *
1525 * @param[in] port port at which FPM is running.
1526 * @param[in] enable TRUE if the zebra FPM module should be enabled
1527 *
1528 * Returns TRUE on success.
1529 */
1530int
1531zfpm_init (struct thread_master *master, int enable, uint16_t port)
1532{
1533 static int initialized = 0;
1534
1535 if (initialized) {
1536 return 1;
1537 }
1538
1539 initialized = 1;
1540
1541 memset (zfpm_g, 0, sizeof (*zfpm_g));
1542 zfpm_g->master = master;
1543 TAILQ_INIT(&zfpm_g->dest_q);
1544 zfpm_g->sock = -1;
1545 zfpm_g->state = ZFPM_STATE_IDLE;
1546
1547 /*
1548 * Netlink must currently be available for the Zebra-FPM interface
1549 * to be enabled.
1550 */
1551#ifndef HAVE_NETLINK
1552 enable = 0;
1553#endif
1554
1555 zfpm_g->enabled = enable;
1556
1557 zfpm_stats_init (&zfpm_g->stats);
1558 zfpm_stats_init (&zfpm_g->last_ivl_stats);
1559 zfpm_stats_init (&zfpm_g->cumulative_stats);
1560
1561 install_element (ENABLE_NODE, &show_zebra_fpm_stats_cmd);
1562 install_element (ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
1563
1564 if (!enable) {
1565 return 1;
1566 }
1567
1568 if (!port)
1569 port = FPM_DEFAULT_PORT;
1570
1571 zfpm_g->fpm_port = port;
1572
1573 zfpm_g->obuf = stream_new (ZFPM_OBUF_SIZE);
1574 zfpm_g->ibuf = stream_new (ZFPM_IBUF_SIZE);
1575
1576 zfpm_start_stats_timer ();
1577 zfpm_start_connect_timer ("initialized");
1578
1579 return 1;
1580}