blob: fa2bc3a578c97e8e7bc55c421ac1fc788339b4de [file] [log] [blame]
Avneesh Sachdev5adc2522012-11-13 22:48:59 +00001/*
2 * Main implementation file for interface to Forwarding Plane Manager.
3 *
4 * Copyright (C) 2012 by Open Source Routing.
5 * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
6 *
7 * This file is part of GNU Zebra.
8 *
9 * GNU Zebra is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2, or (at your option) any
12 * later version.
13 *
14 * GNU Zebra is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License
20 * along with GNU Zebra; see the file COPYING. If not, write to the Free
21 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
22 * 02111-1307, USA.
23 */
24
25#include <zebra.h>
26
27#include "log.h"
28#include "stream.h"
29#include "thread.h"
30#include "network.h"
31#include "command.h"
32
33#include "zebra/rib.h"
34
35#include "fpm/fpm.h"
36#include "zebra_fpm.h"
37#include "zebra_fpm_private.h"
38
39/*
40 * Interval at which we attempt to connect to the FPM.
41 */
42#define ZFPM_CONNECT_RETRY_IVL 5
43
44/*
45 * Sizes of outgoing and incoming stream buffers for writing/reading
46 * FPM messages.
47 */
48#define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
49#define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
50
51/*
52 * The maximum number of times the FPM socket write callback can call
53 * 'write' before it yields.
54 */
55#define ZFPM_MAX_WRITES_PER_RUN 10
56
57/*
58 * Interval over which we collect statistics.
59 */
60#define ZFPM_STATS_IVL_SECS 10
61
62/*
63 * Structure that holds state for iterating over all route_node
64 * structures that are candidates for being communicated to the FPM.
65 */
66typedef struct zfpm_rnodes_iter_t_
67{
68 rib_tables_iter_t tables_iter;
69 route_table_iter_t iter;
70} zfpm_rnodes_iter_t;
71
72/*
73 * Statistics.
74 */
75typedef struct zfpm_stats_t_ {
76 unsigned long connect_calls;
77 unsigned long connect_no_sock;
78
79 unsigned long read_cb_calls;
80
81 unsigned long write_cb_calls;
82 unsigned long write_calls;
83 unsigned long partial_writes;
84 unsigned long max_writes_hit;
85 unsigned long t_write_yields;
86
87 unsigned long nop_deletes_skipped;
88 unsigned long route_adds;
89 unsigned long route_dels;
90
91 unsigned long updates_triggered;
92 unsigned long redundant_triggers;
93 unsigned long non_fpm_table_triggers;
94
95 unsigned long dests_del_after_update;
96
97 unsigned long t_conn_down_starts;
98 unsigned long t_conn_down_dests_processed;
99 unsigned long t_conn_down_yields;
100 unsigned long t_conn_down_finishes;
101
102 unsigned long t_conn_up_starts;
103 unsigned long t_conn_up_dests_processed;
104 unsigned long t_conn_up_yields;
105 unsigned long t_conn_up_aborts;
106 unsigned long t_conn_up_finishes;
107
108} zfpm_stats_t;
109
110/*
111 * States for the FPM state machine.
112 */
113typedef enum {
114
115 /*
116 * In this state we are not yet ready to connect to the FPM. This
117 * can happen when this module is disabled, or if we're cleaning up
118 * after a connection has gone down.
119 */
120 ZFPM_STATE_IDLE,
121
122 /*
123 * Ready to talk to the FPM and periodically trying to connect to
124 * it.
125 */
126 ZFPM_STATE_ACTIVE,
127
128 /*
129 * In the middle of bringing up a TCP connection. Specifically,
130 * waiting for a connect() call to complete asynchronously.
131 */
132 ZFPM_STATE_CONNECTING,
133
134 /*
135 * TCP connection to the FPM is up.
136 */
137 ZFPM_STATE_ESTABLISHED
138
139} zfpm_state_t;
140
141/*
142 * Globals.
143 */
144typedef struct zfpm_glob_t_
145{
146
147 /*
148 * True if the FPM module has been enabled.
149 */
150 int enabled;
151
152 struct thread_master *master;
153
154 zfpm_state_t state;
155
Jonathan Hartf8989de2016-05-09 15:05:16 -0700156 in_addr_t fpm_server;
Avneesh Sachdev5adc2522012-11-13 22:48:59 +0000157 /*
158 * Port on which the FPM is running.
159 */
160 int fpm_port;
161
162 /*
163 * List of rib_dest_t structures to be processed
164 */
165 TAILQ_HEAD (zfpm_dest_q, rib_dest_t_) dest_q;
166
167 /*
168 * Stream socket to the FPM.
169 */
170 int sock;
171
172 /*
173 * Buffers for messages to/from the FPM.
174 */
175 struct stream *obuf;
176 struct stream *ibuf;
177
178 /*
179 * Threads for I/O.
180 */
181 struct thread *t_connect;
182 struct thread *t_write;
183 struct thread *t_read;
184
185 /*
186 * Thread to clean up after the TCP connection to the FPM goes down
187 * and the state that belongs to it.
188 */
189 struct thread *t_conn_down;
190
191 struct {
192 zfpm_rnodes_iter_t iter;
193 } t_conn_down_state;
194
195 /*
196 * Thread to take actions once the TCP conn to the FPM comes up, and
197 * the state that belongs to it.
198 */
199 struct thread *t_conn_up;
200
201 struct {
202 zfpm_rnodes_iter_t iter;
203 } t_conn_up_state;
204
205 unsigned long connect_calls;
206 time_t last_connect_call_time;
207
208 /*
209 * Stats from the start of the current statistics interval up to
210 * now. These are the counters we typically update in the code.
211 */
212 zfpm_stats_t stats;
213
214 /*
215 * Statistics that were gathered in the last collection interval.
216 */
217 zfpm_stats_t last_ivl_stats;
218
219 /*
220 * Cumulative stats from the last clear to the start of the current
221 * statistics interval.
222 */
223 zfpm_stats_t cumulative_stats;
224
225 /*
226 * Stats interval timer.
227 */
228 struct thread *t_stats;
229
230 /*
231 * If non-zero, the last time when statistics were cleared.
232 */
233 time_t last_stats_clear_time;
234
235} zfpm_glob_t;
236
237static zfpm_glob_t zfpm_glob_space;
238static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
239
240static int zfpm_read_cb (struct thread *thread);
241static int zfpm_write_cb (struct thread *thread);
242
243static void zfpm_set_state (zfpm_state_t state, const char *reason);
244static void zfpm_start_connect_timer (const char *reason);
245static void zfpm_start_stats_timer (void);
246
247/*
248 * zfpm_thread_should_yield
249 */
250static inline int
251zfpm_thread_should_yield (struct thread *t)
252{
253 return thread_should_yield (t);
254}
255
256/*
257 * zfpm_state_to_str
258 */
259static const char *
260zfpm_state_to_str (zfpm_state_t state)
261{
262 switch (state)
263 {
264
265 case ZFPM_STATE_IDLE:
266 return "idle";
267
268 case ZFPM_STATE_ACTIVE:
269 return "active";
270
271 case ZFPM_STATE_CONNECTING:
272 return "connecting";
273
274 case ZFPM_STATE_ESTABLISHED:
275 return "established";
276
277 default:
278 return "unknown";
279 }
280}
281
282/*
283 * zfpm_get_time
284 */
285static time_t
286zfpm_get_time (void)
287{
288 struct timeval tv;
289
290 if (quagga_gettime (QUAGGA_CLK_MONOTONIC, &tv) < 0)
291 zlog_warn ("FPM: quagga_gettime failed!!");
292
293 return tv.tv_sec;
294}
295
296/*
297 * zfpm_get_elapsed_time
298 *
299 * Returns the time elapsed (in seconds) since the given time.
300 */
301static time_t
302zfpm_get_elapsed_time (time_t reference)
303{
304 time_t now;
305
306 now = zfpm_get_time ();
307
308 if (now < reference)
309 {
310 assert (0);
311 return 0;
312 }
313
314 return now - reference;
315}
316
317/*
318 * zfpm_is_table_for_fpm
319 *
320 * Returns TRUE if the the given table is to be communicated to the
321 * FPM.
322 */
323static inline int
324zfpm_is_table_for_fpm (struct route_table *table)
325{
326 rib_table_info_t *info;
327
328 info = rib_table_info (table);
329
330 /*
331 * We only send the unicast tables in the main instance to the FPM
332 * at this point.
333 */
334 if (info->vrf->id != 0)
335 return 0;
336
337 if (info->safi != SAFI_UNICAST)
338 return 0;
339
340 return 1;
341}
342
343/*
344 * zfpm_rnodes_iter_init
345 */
346static inline void
347zfpm_rnodes_iter_init (zfpm_rnodes_iter_t *iter)
348{
349 memset (iter, 0, sizeof (*iter));
350 rib_tables_iter_init (&iter->tables_iter);
351
352 /*
353 * This is a hack, but it makes implementing 'next' easier by
354 * ensuring that route_table_iter_next() will return NULL the first
355 * time we call it.
356 */
357 route_table_iter_init (&iter->iter, NULL);
358 route_table_iter_cleanup (&iter->iter);
359}
360
361/*
362 * zfpm_rnodes_iter_next
363 */
364static inline struct route_node *
365zfpm_rnodes_iter_next (zfpm_rnodes_iter_t *iter)
366{
367 struct route_node *rn;
368 struct route_table *table;
369
370 while (1)
371 {
372 rn = route_table_iter_next (&iter->iter);
373 if (rn)
374 return rn;
375
376 /*
377 * We've made our way through this table, go to the next one.
378 */
379 route_table_iter_cleanup (&iter->iter);
380
381 while ((table = rib_tables_iter_next (&iter->tables_iter)))
382 {
383 if (zfpm_is_table_for_fpm (table))
384 break;
385 }
386
387 if (!table)
388 return NULL;
389
390 route_table_iter_init (&iter->iter, table);
391 }
392
393 return NULL;
394}
395
396/*
397 * zfpm_rnodes_iter_pause
398 */
399static inline void
400zfpm_rnodes_iter_pause (zfpm_rnodes_iter_t *iter)
401{
402 route_table_iter_pause (&iter->iter);
403}
404
405/*
406 * zfpm_rnodes_iter_cleanup
407 */
408static inline void
409zfpm_rnodes_iter_cleanup (zfpm_rnodes_iter_t *iter)
410{
411 route_table_iter_cleanup (&iter->iter);
412 rib_tables_iter_cleanup (&iter->tables_iter);
413}
414
415/*
416 * zfpm_stats_init
417 *
418 * Initialize a statistics block.
419 */
420static inline void
421zfpm_stats_init (zfpm_stats_t *stats)
422{
423 memset (stats, 0, sizeof (*stats));
424}
425
426/*
427 * zfpm_stats_reset
428 */
429static inline void
430zfpm_stats_reset (zfpm_stats_t *stats)
431{
432 zfpm_stats_init (stats);
433}
434
435/*
436 * zfpm_stats_copy
437 */
438static inline void
439zfpm_stats_copy (const zfpm_stats_t *src, zfpm_stats_t *dest)
440{
441 memcpy (dest, src, sizeof (*dest));
442}
443
444/*
445 * zfpm_stats_compose
446 *
447 * Total up the statistics in two stats structures ('s1 and 's2') and
448 * return the result in the third argument, 'result'. Note that the
449 * pointer 'result' may be the same as 's1' or 's2'.
450 *
451 * For simplicity, the implementation below assumes that the stats
452 * structure is composed entirely of counters. This can easily be
453 * changed when necessary.
454 */
455static void
456zfpm_stats_compose (const zfpm_stats_t *s1, const zfpm_stats_t *s2,
457 zfpm_stats_t *result)
458{
459 const unsigned long *p1, *p2;
460 unsigned long *result_p;
461 int i, num_counters;
462
463 p1 = (const unsigned long *) s1;
464 p2 = (const unsigned long *) s2;
465 result_p = (unsigned long *) result;
466
467 num_counters = (sizeof (zfpm_stats_t) / sizeof (unsigned long));
468
469 for (i = 0; i < num_counters; i++)
470 {
471 result_p[i] = p1[i] + p2[i];
472 }
473}
474
475/*
476 * zfpm_read_on
477 */
478static inline void
479zfpm_read_on (void)
480{
481 assert (!zfpm_g->t_read);
482 assert (zfpm_g->sock >= 0);
483
484 THREAD_READ_ON (zfpm_g->master, zfpm_g->t_read, zfpm_read_cb, 0,
485 zfpm_g->sock);
486}
487
488/*
489 * zfpm_write_on
490 */
491static inline void
492zfpm_write_on (void)
493{
494 assert (!zfpm_g->t_write);
495 assert (zfpm_g->sock >= 0);
496
497 THREAD_WRITE_ON (zfpm_g->master, zfpm_g->t_write, zfpm_write_cb, 0,
498 zfpm_g->sock);
499}
500
501/*
502 * zfpm_read_off
503 */
504static inline void
505zfpm_read_off (void)
506{
507 THREAD_READ_OFF (zfpm_g->t_read);
508}
509
510/*
511 * zfpm_write_off
512 */
513static inline void
514zfpm_write_off (void)
515{
516 THREAD_WRITE_OFF (zfpm_g->t_write);
517}
518
519/*
520 * zfpm_conn_up_thread_cb
521 *
522 * Callback for actions to be taken when the connection to the FPM
523 * comes up.
524 */
525static int
526zfpm_conn_up_thread_cb (struct thread *thread)
527{
528 struct route_node *rnode;
529 zfpm_rnodes_iter_t *iter;
530 rib_dest_t *dest;
531
532 assert (zfpm_g->t_conn_up);
533 zfpm_g->t_conn_up = NULL;
534
535 iter = &zfpm_g->t_conn_up_state.iter;
536
537 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
538 {
539 zfpm_debug ("Connection not up anymore, conn_up thread aborting");
540 zfpm_g->stats.t_conn_up_aborts++;
541 goto done;
542 }
543
544 while ((rnode = zfpm_rnodes_iter_next (iter)))
545 {
546 dest = rib_dest_from_rnode (rnode);
547
548 if (dest)
549 {
550 zfpm_g->stats.t_conn_up_dests_processed++;
551 zfpm_trigger_update (rnode, NULL);
552 }
553
554 /*
555 * Yield if need be.
556 */
557 if (!zfpm_thread_should_yield (thread))
558 continue;
559
560 zfpm_g->stats.t_conn_up_yields++;
561 zfpm_rnodes_iter_pause (iter);
562 zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
563 zfpm_conn_up_thread_cb,
564 0, 0);
565 return 0;
566 }
567
568 zfpm_g->stats.t_conn_up_finishes++;
569
570 done:
571 zfpm_rnodes_iter_cleanup (iter);
572 return 0;
573}
574
575/*
576 * zfpm_connection_up
577 *
578 * Called when the connection to the FPM comes up.
579 */
580static void
581zfpm_connection_up (const char *detail)
582{
583 assert (zfpm_g->sock >= 0);
584 zfpm_read_on ();
585 zfpm_write_on ();
586 zfpm_set_state (ZFPM_STATE_ESTABLISHED, detail);
587
588 /*
589 * Start thread to push existing routes to the FPM.
590 */
591 assert (!zfpm_g->t_conn_up);
592
593 zfpm_rnodes_iter_init (&zfpm_g->t_conn_up_state.iter);
594
595 zfpm_debug ("Starting conn_up thread");
596 zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
597 zfpm_conn_up_thread_cb, 0, 0);
598 zfpm_g->stats.t_conn_up_starts++;
599}
600
601/*
602 * zfpm_connect_check
603 *
604 * Check if an asynchronous connect() to the FPM is complete.
605 */
606static void
607zfpm_connect_check ()
608{
609 int status;
610 socklen_t slen;
611 int ret;
612
613 zfpm_read_off ();
614 zfpm_write_off ();
615
616 slen = sizeof (status);
617 ret = getsockopt (zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *) &status,
618 &slen);
619
620 if (ret >= 0 && status == 0)
621 {
622 zfpm_connection_up ("async connect complete");
623 return;
624 }
625
626 /*
627 * getsockopt() failed or indicated an error on the socket.
628 */
629 close (zfpm_g->sock);
630 zfpm_g->sock = -1;
631
632 zfpm_start_connect_timer ("getsockopt() after async connect failed");
633 return;
634}
635
636/*
637 * zfpm_conn_down_thread_cb
638 *
639 * Callback that is invoked to clean up state after the TCP connection
640 * to the FPM goes down.
641 */
642static int
643zfpm_conn_down_thread_cb (struct thread *thread)
644{
645 struct route_node *rnode;
646 zfpm_rnodes_iter_t *iter;
647 rib_dest_t *dest;
648
649 assert (zfpm_g->state == ZFPM_STATE_IDLE);
650
651 assert (zfpm_g->t_conn_down);
652 zfpm_g->t_conn_down = NULL;
653
654 iter = &zfpm_g->t_conn_down_state.iter;
655
656 while ((rnode = zfpm_rnodes_iter_next (iter)))
657 {
658 dest = rib_dest_from_rnode (rnode);
659
660 if (dest)
661 {
662 if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM))
663 {
664 TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
665 }
666
667 UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
668 UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
669
670 zfpm_g->stats.t_conn_down_dests_processed++;
671
672 /*
673 * Check if the dest should be deleted.
674 */
675 rib_gc_dest(rnode);
676 }
677
678 /*
679 * Yield if need be.
680 */
681 if (!zfpm_thread_should_yield (thread))
682 continue;
683
684 zfpm_g->stats.t_conn_down_yields++;
685 zfpm_rnodes_iter_pause (iter);
686 zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
687 zfpm_conn_down_thread_cb,
688 0, 0);
689 return 0;
690 }
691
692 zfpm_g->stats.t_conn_down_finishes++;
693 zfpm_rnodes_iter_cleanup (iter);
694
695 /*
696 * Start the process of connecting to the FPM again.
697 */
698 zfpm_start_connect_timer ("cleanup complete");
699 return 0;
700}
701
702/*
703 * zfpm_connection_down
704 *
705 * Called when the connection to the FPM has gone down.
706 */
707static void
708zfpm_connection_down (const char *detail)
709{
710 if (!detail)
711 detail = "unknown";
712
713 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
714
715 zlog_info ("connection to the FPM has gone down: %s", detail);
716
717 zfpm_read_off ();
718 zfpm_write_off ();
719
720 stream_reset (zfpm_g->ibuf);
721 stream_reset (zfpm_g->obuf);
722
723 if (zfpm_g->sock >= 0) {
724 close (zfpm_g->sock);
725 zfpm_g->sock = -1;
726 }
727
728 /*
729 * Start thread to clean up state after the connection goes down.
730 */
731 assert (!zfpm_g->t_conn_down);
732 zfpm_debug ("Starting conn_down thread");
733 zfpm_rnodes_iter_init (&zfpm_g->t_conn_down_state.iter);
734 zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
735 zfpm_conn_down_thread_cb, 0, 0);
736 zfpm_g->stats.t_conn_down_starts++;
737
738 zfpm_set_state (ZFPM_STATE_IDLE, detail);
739}
740
741/*
742 * zfpm_read_cb
743 */
744static int
745zfpm_read_cb (struct thread *thread)
746{
747 size_t already;
748 struct stream *ibuf;
749 uint16_t msg_len;
750 fpm_msg_hdr_t *hdr;
751
752 zfpm_g->stats.read_cb_calls++;
753 assert (zfpm_g->t_read);
754 zfpm_g->t_read = NULL;
755
756 /*
757 * Check if async connect is now done.
758 */
759 if (zfpm_g->state == ZFPM_STATE_CONNECTING)
760 {
761 zfpm_connect_check();
762 return 0;
763 }
764
765 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
766 assert (zfpm_g->sock >= 0);
767
768 ibuf = zfpm_g->ibuf;
769
770 already = stream_get_endp (ibuf);
771 if (already < FPM_MSG_HDR_LEN)
772 {
773 ssize_t nbyte;
774
775 nbyte = stream_read_try (ibuf, zfpm_g->sock, FPM_MSG_HDR_LEN - already);
776 if (nbyte == 0 || nbyte == -1)
777 {
778 zfpm_connection_down ("closed socket in read");
779 return 0;
780 }
781
782 if (nbyte != (ssize_t) (FPM_MSG_HDR_LEN - already))
783 goto done;
784
785 already = FPM_MSG_HDR_LEN;
786 }
787
788 stream_set_getp (ibuf, 0);
789
790 hdr = (fpm_msg_hdr_t *) stream_pnt (ibuf);
791
792 if (!fpm_msg_hdr_ok (hdr))
793 {
794 zfpm_connection_down ("invalid message header");
795 return 0;
796 }
797
798 msg_len = fpm_msg_len (hdr);
799
800 /*
801 * Read out the rest of the packet.
802 */
803 if (already < msg_len)
804 {
805 ssize_t nbyte;
806
807 nbyte = stream_read_try (ibuf, zfpm_g->sock, msg_len - already);
808
809 if (nbyte == 0 || nbyte == -1)
810 {
811 zfpm_connection_down ("failed to read message");
812 return 0;
813 }
814
815 if (nbyte != (ssize_t) (msg_len - already))
816 goto done;
817 }
818
819 zfpm_debug ("Read out a full fpm message");
820
821 /*
822 * Just throw it away for now.
823 */
824 stream_reset (ibuf);
825
826 done:
827 zfpm_read_on ();
828 return 0;
829}
830
831/*
832 * zfpm_writes_pending
833 *
834 * Returns TRUE if we may have something to write to the FPM.
835 */
836static int
837zfpm_writes_pending (void)
838{
839
840 /*
841 * Check if there is any data in the outbound buffer that has not
842 * been written to the socket yet.
843 */
844 if (stream_get_endp (zfpm_g->obuf) - stream_get_getp (zfpm_g->obuf))
845 return 1;
846
847 /*
848 * Check if there are any prefixes on the outbound queue.
849 */
850 if (!TAILQ_EMPTY (&zfpm_g->dest_q))
851 return 1;
852
853 return 0;
854}
855
856/*
857 * zfpm_encode_route
858 *
859 * Encode a message to the FPM with information about the given route.
860 *
861 * Returns the number of bytes written to the buffer. 0 or a negative
862 * value indicates an error.
863 */
864static inline int
865zfpm_encode_route (rib_dest_t *dest, struct rib *rib, char *in_buf,
866 size_t in_buf_len)
867{
868#ifndef HAVE_NETLINK
869 return 0;
870#else
871
872 int cmd;
873
874 cmd = rib ? RTM_NEWROUTE : RTM_DELROUTE;
875
876 return zfpm_netlink_encode_route (cmd, dest, rib, in_buf, in_buf_len);
877
878#endif /* HAVE_NETLINK */
879}
880
881/*
882 * zfpm_route_for_update
883 *
884 * Returns the rib that is to be sent to the FPM for a given dest.
885 */
886static struct rib *
887zfpm_route_for_update (rib_dest_t *dest)
888{
889 struct rib *rib;
890
891 RIB_DEST_FOREACH_ROUTE (dest, rib)
892 {
893 if (!CHECK_FLAG (rib->flags, ZEBRA_FLAG_SELECTED))
894 continue;
895
896 return rib;
897 }
898
899 /*
900 * We have no route for this destination.
901 */
902 return NULL;
903}
904
905/*
906 * zfpm_build_updates
907 *
908 * Process the outgoing queue and write messages to the outbound
909 * buffer.
910 */
911static void
912zfpm_build_updates (void)
913{
914 struct stream *s;
915 rib_dest_t *dest;
916 unsigned char *buf, *data, *buf_end;
917 size_t msg_len;
918 size_t data_len;
919 fpm_msg_hdr_t *hdr;
920 struct rib *rib;
921 int is_add, write_msg;
922
923 s = zfpm_g->obuf;
924
925 assert (stream_empty (s));
926
927 do {
928
929 /*
930 * Make sure there is enough space to write another message.
931 */
932 if (STREAM_WRITEABLE (s) < FPM_MAX_MSG_LEN)
933 break;
934
935 buf = STREAM_DATA (s) + stream_get_endp (s);
936 buf_end = buf + STREAM_WRITEABLE (s);
937
938 dest = TAILQ_FIRST (&zfpm_g->dest_q);
939 if (!dest)
940 break;
941
942 assert (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM));
943
944 hdr = (fpm_msg_hdr_t *) buf;
945 hdr->version = FPM_PROTO_VERSION;
946 hdr->msg_type = FPM_MSG_TYPE_NETLINK;
947
948 data = fpm_msg_data (hdr);
949
950 rib = zfpm_route_for_update (dest);
951 is_add = rib ? 1 : 0;
952
953 write_msg = 1;
954
955 /*
956 * If this is a route deletion, and we have not sent the route to
957 * the FPM previously, skip it.
958 */
959 if (!is_add && !CHECK_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM))
960 {
961 write_msg = 0;
962 zfpm_g->stats.nop_deletes_skipped++;
963 }
964
965 if (write_msg) {
966 data_len = zfpm_encode_route (dest, rib, (char *) data, buf_end - data);
967
968 assert (data_len);
969 if (data_len)
970 {
971 msg_len = fpm_data_len_to_msg_len (data_len);
972 hdr->msg_len = htons (msg_len);
973 stream_forward_endp (s, msg_len);
974
975 if (is_add)
976 zfpm_g->stats.route_adds++;
977 else
978 zfpm_g->stats.route_dels++;
979 }
980 }
981
982 /*
983 * Remove the dest from the queue, and reset the flag.
984 */
985 UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
986 TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
987
988 if (is_add)
989 {
990 SET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
991 }
992 else
993 {
994 UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
995 }
996
997 /*
998 * Delete the destination if necessary.
999 */
1000 if (rib_gc_dest (dest->rnode))
1001 zfpm_g->stats.dests_del_after_update++;
1002
1003 } while (1);
1004
1005}
1006
1007/*
1008 * zfpm_write_cb
1009 */
1010static int
1011zfpm_write_cb (struct thread *thread)
1012{
1013 struct stream *s;
1014 int num_writes;
1015
1016 zfpm_g->stats.write_cb_calls++;
1017 assert (zfpm_g->t_write);
1018 zfpm_g->t_write = NULL;
1019
1020 /*
1021 * Check if async connect is now done.
1022 */
1023 if (zfpm_g->state == ZFPM_STATE_CONNECTING)
1024 {
1025 zfpm_connect_check ();
1026 return 0;
1027 }
1028
1029 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
1030 assert (zfpm_g->sock >= 0);
1031
1032 num_writes = 0;
1033
1034 do
1035 {
1036 int bytes_to_write, bytes_written;
1037
1038 s = zfpm_g->obuf;
1039
1040 /*
1041 * If the stream is empty, try fill it up with data.
1042 */
1043 if (stream_empty (s))
1044 {
1045 zfpm_build_updates ();
1046 }
1047
1048 bytes_to_write = stream_get_endp (s) - stream_get_getp (s);
1049 if (!bytes_to_write)
1050 break;
1051
1052 bytes_written = write (zfpm_g->sock, STREAM_PNT (s), bytes_to_write);
1053 zfpm_g->stats.write_calls++;
1054 num_writes++;
1055
1056 if (bytes_written < 0)
1057 {
1058 if (ERRNO_IO_RETRY (errno))
1059 break;
1060
1061 zfpm_connection_down ("failed to write to socket");
1062 return 0;
1063 }
1064
1065 if (bytes_written != bytes_to_write)
1066 {
1067
1068 /*
1069 * Partial write.
1070 */
1071 stream_forward_getp (s, bytes_written);
1072 zfpm_g->stats.partial_writes++;
1073 break;
1074 }
1075
1076 /*
1077 * We've written out the entire contents of the stream.
1078 */
1079 stream_reset (s);
1080
1081 if (num_writes >= ZFPM_MAX_WRITES_PER_RUN)
1082 {
1083 zfpm_g->stats.max_writes_hit++;
1084 break;
1085 }
1086
1087 if (zfpm_thread_should_yield (thread))
1088 {
1089 zfpm_g->stats.t_write_yields++;
1090 break;
1091 }
1092 } while (1);
1093
1094 if (zfpm_writes_pending ())
1095 zfpm_write_on ();
1096
1097 return 0;
1098}
1099
1100/*
1101 * zfpm_connect_cb
1102 */
1103static int
1104zfpm_connect_cb (struct thread *t)
1105{
1106 int sock, ret;
1107 struct sockaddr_in serv;
1108
1109 assert (zfpm_g->t_connect);
1110 zfpm_g->t_connect = NULL;
1111 assert (zfpm_g->state == ZFPM_STATE_ACTIVE);
1112
1113 sock = socket (AF_INET, SOCK_STREAM, 0);
1114 if (sock < 0)
1115 {
1116 zfpm_debug ("Failed to create socket for connect(): %s", strerror(errno));
1117 zfpm_g->stats.connect_no_sock++;
1118 return 0;
1119 }
1120
1121 set_nonblocking(sock);
1122
1123 /* Make server socket. */
1124 memset (&serv, 0, sizeof (serv));
1125 serv.sin_family = AF_INET;
1126 serv.sin_port = htons (zfpm_g->fpm_port);
1127#ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
1128 serv.sin_len = sizeof (struct sockaddr_in);
1129#endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
Jonathan Hartf8989de2016-05-09 15:05:16 -07001130 if (!zfpm_g->fpm_server)
1131 serv.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
1132 else
1133 serv.sin_addr.s_addr = (zfpm_g->fpm_server);
Avneesh Sachdev5adc2522012-11-13 22:48:59 +00001134
1135 /*
1136 * Connect to the FPM.
1137 */
1138 zfpm_g->connect_calls++;
1139 zfpm_g->stats.connect_calls++;
1140 zfpm_g->last_connect_call_time = zfpm_get_time ();
1141
1142 ret = connect (sock, (struct sockaddr *) &serv, sizeof (serv));
1143 if (ret >= 0)
1144 {
1145 zfpm_g->sock = sock;
1146 zfpm_connection_up ("connect succeeded");
1147 return 1;
1148 }
1149
1150 if (errno == EINPROGRESS)
1151 {
1152 zfpm_g->sock = sock;
1153 zfpm_read_on ();
1154 zfpm_write_on ();
1155 zfpm_set_state (ZFPM_STATE_CONNECTING, "async connect in progress");
1156 return 0;
1157 }
1158
1159 zlog_info ("can't connect to FPM %d: %s", sock, safe_strerror (errno));
1160 close (sock);
1161
1162 /*
1163 * Restart timer for retrying connection.
1164 */
1165 zfpm_start_connect_timer ("connect() failed");
1166 return 0;
1167}
1168
1169/*
1170 * zfpm_set_state
1171 *
1172 * Move state machine into the given state.
1173 */
1174static void
1175zfpm_set_state (zfpm_state_t state, const char *reason)
1176{
1177 zfpm_state_t cur_state = zfpm_g->state;
1178
1179 if (!reason)
1180 reason = "Unknown";
1181
1182 if (state == cur_state)
1183 return;
1184
1185 zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1186 zfpm_state_to_str (cur_state), zfpm_state_to_str (state),
1187 reason);
1188
1189 switch (state) {
1190
1191 case ZFPM_STATE_IDLE:
1192 assert (cur_state == ZFPM_STATE_ESTABLISHED);
1193 break;
1194
1195 case ZFPM_STATE_ACTIVE:
1196 assert (cur_state == ZFPM_STATE_IDLE ||
1197 cur_state == ZFPM_STATE_CONNECTING);
1198 assert (zfpm_g->t_connect);
1199 break;
1200
1201 case ZFPM_STATE_CONNECTING:
1202 assert (zfpm_g->sock);
1203 assert (cur_state == ZFPM_STATE_ACTIVE);
1204 assert (zfpm_g->t_read);
1205 assert (zfpm_g->t_write);
1206 break;
1207
1208 case ZFPM_STATE_ESTABLISHED:
1209 assert (cur_state == ZFPM_STATE_ACTIVE ||
1210 cur_state == ZFPM_STATE_CONNECTING);
1211 assert (zfpm_g->sock);
1212 assert (zfpm_g->t_read);
1213 assert (zfpm_g->t_write);
1214 break;
1215 }
1216
1217 zfpm_g->state = state;
1218}
1219
1220/*
1221 * zfpm_calc_connect_delay
1222 *
1223 * Returns the number of seconds after which we should attempt to
1224 * reconnect to the FPM.
1225 */
1226static long
1227zfpm_calc_connect_delay (void)
1228{
1229 time_t elapsed;
1230
1231 /*
1232 * Return 0 if this is our first attempt to connect.
1233 */
1234 if (zfpm_g->connect_calls == 0)
1235 {
1236 return 0;
1237 }
1238
1239 elapsed = zfpm_get_elapsed_time (zfpm_g->last_connect_call_time);
1240
1241 if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1242 return 0;
1243 }
1244
1245 return ZFPM_CONNECT_RETRY_IVL - elapsed;
1246}
1247
1248/*
1249 * zfpm_start_connect_timer
1250 */
1251static void
1252zfpm_start_connect_timer (const char *reason)
1253{
1254 long delay_secs;
1255
1256 assert (!zfpm_g->t_connect);
1257 assert (zfpm_g->sock < 0);
1258
1259 assert(zfpm_g->state == ZFPM_STATE_IDLE ||
1260 zfpm_g->state == ZFPM_STATE_ACTIVE ||
1261 zfpm_g->state == ZFPM_STATE_CONNECTING);
1262
1263 delay_secs = zfpm_calc_connect_delay();
1264 zfpm_debug ("scheduling connect in %ld seconds", delay_secs);
1265
1266 THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_connect, zfpm_connect_cb, 0,
1267 delay_secs);
1268 zfpm_set_state (ZFPM_STATE_ACTIVE, reason);
1269}
1270
1271/*
1272 * zfpm_is_enabled
1273 *
1274 * Returns TRUE if the zebra FPM module has been enabled.
1275 */
1276static inline int
1277zfpm_is_enabled (void)
1278{
1279 return zfpm_g->enabled;
1280}
1281
1282/*
1283 * zfpm_conn_is_up
1284 *
1285 * Returns TRUE if the connection to the FPM is up.
1286 */
1287static inline int
1288zfpm_conn_is_up (void)
1289{
1290 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1291 return 0;
1292
1293 assert (zfpm_g->sock >= 0);
1294
1295 return 1;
1296}
1297
1298/*
1299 * zfpm_trigger_update
1300 *
1301 * The zebra code invokes this function to indicate that we should
1302 * send an update to the FPM about the given route_node.
1303 */
1304void
1305zfpm_trigger_update (struct route_node *rn, const char *reason)
1306{
1307 rib_dest_t *dest;
1308 char buf[INET6_ADDRSTRLEN];
1309
1310 /*
1311 * Ignore if the connection is down. We will update the FPM about
1312 * all destinations once the connection comes up.
1313 */
1314 if (!zfpm_conn_is_up ())
1315 return;
1316
1317 dest = rib_dest_from_rnode (rn);
1318
1319 /*
1320 * Ignore the trigger if the dest is not in a table that we would
1321 * send to the FPM.
1322 */
1323 if (!zfpm_is_table_for_fpm (rib_dest_table (dest)))
1324 {
1325 zfpm_g->stats.non_fpm_table_triggers++;
1326 return;
1327 }
1328
1329 if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM)) {
1330 zfpm_g->stats.redundant_triggers++;
1331 return;
1332 }
1333
1334 if (reason)
1335 {
1336 zfpm_debug ("%s/%d triggering update to FPM - Reason: %s",
1337 inet_ntop (rn->p.family, &rn->p.u.prefix, buf, sizeof (buf)),
1338 rn->p.prefixlen, reason);
1339 }
1340
1341 SET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
1342 TAILQ_INSERT_TAIL (&zfpm_g->dest_q, dest, fpm_q_entries);
1343 zfpm_g->stats.updates_triggered++;
1344
1345 /*
1346 * Make sure that writes are enabled.
1347 */
1348 if (zfpm_g->t_write)
1349 return;
1350
1351 zfpm_write_on ();
1352}
1353
1354/*
1355 * zfpm_stats_timer_cb
1356 */
1357static int
1358zfpm_stats_timer_cb (struct thread *t)
1359{
1360 assert (zfpm_g->t_stats);
1361 zfpm_g->t_stats = NULL;
1362
1363 /*
1364 * Remember the stats collected in the last interval for display
1365 * purposes.
1366 */
1367 zfpm_stats_copy (&zfpm_g->stats, &zfpm_g->last_ivl_stats);
1368
1369 /*
1370 * Add the current set of stats into the cumulative statistics.
1371 */
1372 zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1373 &zfpm_g->cumulative_stats);
1374
1375 /*
1376 * Start collecting stats afresh over the next interval.
1377 */
1378 zfpm_stats_reset (&zfpm_g->stats);
1379
1380 zfpm_start_stats_timer ();
1381
1382 return 0;
1383}
1384
1385/*
1386 * zfpm_stop_stats_timer
1387 */
1388static void
1389zfpm_stop_stats_timer (void)
1390{
1391 if (!zfpm_g->t_stats)
1392 return;
1393
1394 zfpm_debug ("Stopping existing stats timer");
1395 THREAD_TIMER_OFF (zfpm_g->t_stats);
1396}
1397
1398/*
1399 * zfpm_start_stats_timer
1400 */
1401void
1402zfpm_start_stats_timer (void)
1403{
1404 assert (!zfpm_g->t_stats);
1405
1406 THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_stats, zfpm_stats_timer_cb, 0,
1407 ZFPM_STATS_IVL_SECS);
1408}
1409
1410/*
1411 * Helper macro for zfpm_show_stats() below.
1412 */
1413#define ZFPM_SHOW_STAT(counter) \
1414 do { \
1415 vty_out (vty, "%-40s %10lu %16lu%s", #counter, total_stats.counter, \
1416 zfpm_g->last_ivl_stats.counter, VTY_NEWLINE); \
1417 } while (0)
1418
1419/*
1420 * zfpm_show_stats
1421 */
1422static void
1423zfpm_show_stats (struct vty *vty)
1424{
1425 zfpm_stats_t total_stats;
1426 time_t elapsed;
1427
1428 vty_out (vty, "%s%-40s %10s Last %2d secs%s%s", VTY_NEWLINE, "Counter",
1429 "Total", ZFPM_STATS_IVL_SECS, VTY_NEWLINE, VTY_NEWLINE);
1430
1431 /*
1432 * Compute the total stats up to this instant.
1433 */
1434 zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1435 &total_stats);
1436
1437 ZFPM_SHOW_STAT (connect_calls);
1438 ZFPM_SHOW_STAT (connect_no_sock);
1439 ZFPM_SHOW_STAT (read_cb_calls);
1440 ZFPM_SHOW_STAT (write_cb_calls);
1441 ZFPM_SHOW_STAT (write_calls);
1442 ZFPM_SHOW_STAT (partial_writes);
1443 ZFPM_SHOW_STAT (max_writes_hit);
1444 ZFPM_SHOW_STAT (t_write_yields);
1445 ZFPM_SHOW_STAT (nop_deletes_skipped);
1446 ZFPM_SHOW_STAT (route_adds);
1447 ZFPM_SHOW_STAT (route_dels);
1448 ZFPM_SHOW_STAT (updates_triggered);
1449 ZFPM_SHOW_STAT (non_fpm_table_triggers);
1450 ZFPM_SHOW_STAT (redundant_triggers);
1451 ZFPM_SHOW_STAT (dests_del_after_update);
1452 ZFPM_SHOW_STAT (t_conn_down_starts);
1453 ZFPM_SHOW_STAT (t_conn_down_dests_processed);
1454 ZFPM_SHOW_STAT (t_conn_down_yields);
1455 ZFPM_SHOW_STAT (t_conn_down_finishes);
1456 ZFPM_SHOW_STAT (t_conn_up_starts);
1457 ZFPM_SHOW_STAT (t_conn_up_dests_processed);
1458 ZFPM_SHOW_STAT (t_conn_up_yields);
1459 ZFPM_SHOW_STAT (t_conn_up_aborts);
1460 ZFPM_SHOW_STAT (t_conn_up_finishes);
1461
1462 if (!zfpm_g->last_stats_clear_time)
1463 return;
1464
1465 elapsed = zfpm_get_elapsed_time (zfpm_g->last_stats_clear_time);
1466
1467 vty_out (vty, "%sStats were cleared %lu seconds ago%s", VTY_NEWLINE,
1468 (unsigned long) elapsed, VTY_NEWLINE);
1469}
1470
1471/*
1472 * zfpm_clear_stats
1473 */
1474static void
1475zfpm_clear_stats (struct vty *vty)
1476{
1477 if (!zfpm_is_enabled ())
1478 {
1479 vty_out (vty, "The FPM module is not enabled...%s", VTY_NEWLINE);
1480 return;
1481 }
1482
1483 zfpm_stats_reset (&zfpm_g->stats);
1484 zfpm_stats_reset (&zfpm_g->last_ivl_stats);
1485 zfpm_stats_reset (&zfpm_g->cumulative_stats);
1486
1487 zfpm_stop_stats_timer ();
1488 zfpm_start_stats_timer ();
1489
1490 zfpm_g->last_stats_clear_time = zfpm_get_time();
1491
1492 vty_out (vty, "Cleared FPM stats%s", VTY_NEWLINE);
1493}
1494
1495/*
1496 * show_zebra_fpm_stats
1497 */
1498DEFUN (show_zebra_fpm_stats,
1499 show_zebra_fpm_stats_cmd,
1500 "show zebra fpm stats",
1501 SHOW_STR
1502 "Zebra information\n"
1503 "Forwarding Path Manager information\n"
1504 "Statistics\n")
1505{
1506 zfpm_show_stats (vty);
1507 return CMD_SUCCESS;
1508}
1509
1510/*
1511 * clear_zebra_fpm_stats
1512 */
1513DEFUN (clear_zebra_fpm_stats,
1514 clear_zebra_fpm_stats_cmd,
1515 "clear zebra fpm stats",
1516 CLEAR_STR
1517 "Zebra information\n"
1518 "Clear Forwarding Path Manager information\n"
1519 "Statistics\n")
1520{
1521 zfpm_clear_stats (vty);
1522 return CMD_SUCCESS;
1523}
1524
Jonathan Hartf8989de2016-05-09 15:05:16 -07001525/*
1526 * update fpm connection information
1527 */
1528DEFUN ( fpm_remote_ip,
1529 fpm_remote_ip_cmd,
1530 "fpm connection ip A.B.C.D port <1-65535>",
1531 "fpm connection remote ip and port\n"
1532 "Remote fpm server ip A.B.C.D\n"
1533 "Enter ip ")
1534{
1535
1536 in_addr_t fpm_server;
1537 uint32_t port_no;
1538
1539 fpm_server = inet_addr (argv[0]);
1540 if (fpm_server == INADDR_NONE)
1541 return CMD_ERR_INCOMPLETE;
1542
1543 port_no = atoi (argv[1]);
1544 if (port_no < TCP_MIN_PORT || port_no > TCP_MAX_PORT)
1545 return CMD_ERR_INCOMPLETE;
1546
1547 if (zfpm_g->fpm_server == fpm_server &&
1548 zfpm_g->fpm_port == port_no)
1549 goto cmd_success;
1550
1551 zfpm_g->fpm_server = fpm_server;
1552 zfpm_g->fpm_port = port_no;
1553
1554 if (zfpm_conn_is_up ())
1555 zfpm_connection_down ("Restarting to new connection");
1556
1557cmd_success:
1558 return CMD_SUCCESS;
1559}
1560
1561DEFUN ( no_fpm_remote_ip,
1562 no_fpm_remote_ip_cmd,
1563 "no fpm connection ip A.B.C.D port <1-65535>",
1564 "fpm connection remote ip and port\n"
1565 "Connection\n"
1566 "Remote fpm server ip A.B.C.D\n"
1567 "Enter ip ")
1568{
1569 if (zfpm_g->fpm_server != inet_addr (argv[0]) ||
1570 zfpm_g->fpm_port != atoi (argv[1]))
1571 return CMD_ERR_NO_MATCH;
1572
1573 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1574 zfpm_g->fpm_port = FPM_DEFAULT_PORT;
1575
1576 if (zfpm_conn_is_up ())
1577 zfpm_connection_down ("Reverting backto default fpm connection");
1578
1579 return CMD_SUCCESS;
1580}
1581
1582
1583/**
1584 * fpm_remote_srv_write
1585 *
1586 * Module to write remote fpm connection
1587 *
1588 * Returns ZERO on success.
1589 */
1590
1591int fpm_remote_srv_write (struct vty *vty )
1592{
1593 struct in_addr in;
1594
1595 in.s_addr = zfpm_g->fpm_server;
1596
1597 if (zfpm_g->fpm_server != FPM_DEFAULT_IP ||
1598 zfpm_g->fpm_port != FPM_DEFAULT_PORT)
1599 vty_out (vty,"fpm connection ip %s port %d%s", inet_ntoa (in),zfpm_g->fpm_port,VTY_NEWLINE);
1600
1601 return 0;
1602}
1603
1604
Avneesh Sachdev5adc2522012-11-13 22:48:59 +00001605/**
1606 * zfpm_init
1607 *
1608 * One-time initialization of the Zebra FPM module.
1609 *
1610 * @param[in] port port at which FPM is running.
1611 * @param[in] enable TRUE if the zebra FPM module should be enabled
1612 *
1613 * Returns TRUE on success.
1614 */
1615int
1616zfpm_init (struct thread_master *master, int enable, uint16_t port)
1617{
1618 static int initialized = 0;
1619
1620 if (initialized) {
1621 return 1;
1622 }
1623
1624 initialized = 1;
1625
1626 memset (zfpm_g, 0, sizeof (*zfpm_g));
1627 zfpm_g->master = master;
1628 TAILQ_INIT(&zfpm_g->dest_q);
1629 zfpm_g->sock = -1;
1630 zfpm_g->state = ZFPM_STATE_IDLE;
1631
1632 /*
1633 * Netlink must currently be available for the Zebra-FPM interface
1634 * to be enabled.
1635 */
1636#ifndef HAVE_NETLINK
1637 enable = 0;
1638#endif
1639
1640 zfpm_g->enabled = enable;
1641
1642 zfpm_stats_init (&zfpm_g->stats);
1643 zfpm_stats_init (&zfpm_g->last_ivl_stats);
1644 zfpm_stats_init (&zfpm_g->cumulative_stats);
1645
1646 install_element (ENABLE_NODE, &show_zebra_fpm_stats_cmd);
1647 install_element (ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
Jonathan Hartf8989de2016-05-09 15:05:16 -07001648 install_element (CONFIG_NODE, &fpm_remote_ip_cmd);
1649 install_element (CONFIG_NODE, &no_fpm_remote_ip_cmd);
Avneesh Sachdev5adc2522012-11-13 22:48:59 +00001650
1651 if (!enable) {
1652 return 1;
1653 }
1654
Jonathan Hartf8989de2016-05-09 15:05:16 -07001655 if (!zfpm_g->fpm_server)
1656 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1657
Avneesh Sachdev5adc2522012-11-13 22:48:59 +00001658 if (!port)
1659 port = FPM_DEFAULT_PORT;
1660
1661 zfpm_g->fpm_port = port;
1662
1663 zfpm_g->obuf = stream_new (ZFPM_OBUF_SIZE);
1664 zfpm_g->ibuf = stream_new (ZFPM_IBUF_SIZE);
1665
1666 zfpm_start_stats_timer ();
1667 zfpm_start_connect_timer ("initialized");
1668
1669 return 1;
1670}