Initial commit
Change-Id: I6a4444e3c193dae437cd7929f4c39aba7b749efa
diff --git a/extensions/app_acct/acct_db.c b/extensions/app_acct/acct_db.c
new file mode 100644
index 0000000..a444c94
--- /dev/null
+++ b/extensions/app_acct/acct_db.c
@@ -0,0 +1,341 @@
+/*********************************************************************************************************
+* Software License Agreement (BSD License) *
+* Author: Sebastien Decugis <sdecugis@freediameter.net> *
+* *
+* Copyright (c) 2013, WIDE Project and NICT *
+* All rights reserved. *
+* *
+* Redistribution and use of this software in source and binary forms, with or without modification, are *
+* permitted provided that the following conditions are met: *
+* *
+* * Redistributions of source code must retain the above *
+* copyright notice, this list of conditions and the *
+* following disclaimer. *
+* *
+* * Redistributions in binary form must reproduce the above *
+* copyright notice, this list of conditions and the *
+* following disclaimer in the documentation and/or other *
+* materials provided with the distribution. *
+* *
+* * Neither the name of the WIDE Project or NICT nor the *
+* names of its contributors may be used to endorse or *
+* promote products derived from this software without *
+* specific prior written permission of WIDE Project and *
+* NICT. *
+* *
+* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
+* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
+* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
+* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *
+* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS *
+* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
+* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF *
+* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *
+*********************************************************************************************************/
+
+/* Database interface module */
+
+/* There is one connection to the db per thread.
+The connection is stored in the pthread_key_t variable */
+
+
+#include "app_acct.h"
+#include <libpq-fe.h>
+
+const char * diam2db_types_mapping[AVP_TYPE_MAX + 1] = {
+ "" /* AVP_TYPE_GROUPED */,
+ "bytea" /* AVP_TYPE_OCTETSTRING */,
+ "integer" /* AVP_TYPE_INTEGER32 */,
+ "bigint" /* AVP_TYPE_INTEGER64 */,
+ "integer" /* AVP_TYPE_UNSIGNED32 + cast */,
+ "bigint" /* AVP_TYPE_UNSIGNED64 + cast */,
+ "real" /* AVP_TYPE_FLOAT32 */,
+ "double precision" /* AVP_TYPE_FLOAT64 */
+};
+
+static const char * stmt = "acct_db_stmt";
+#ifndef TEST_DEBUG
+static
+#endif /* TEST_DEBUG */
+pthread_key_t connk;
+static char * sql = NULL; /* The buffer that will contain the SQL query */
+static int nbrecords = 0;
+
+
+/* Initialize the database context: connection to the DB, prepared statement to insert new records */
+int acct_db_init(void)
+{
+ struct acct_record_list emptyrecords;
+ struct fd_list * li;
+ size_t sql_allocd = 0; /* The malloc'd size of the buffer */
+ size_t sql_offset = 0; /* The actual data already written in this buffer */
+ int idx = 0;
+ PGresult * res;
+ PGconn *conn;
+ #define REALLOC_SIZE 1024 /* We extend the buffer by this amount */
+
+ TRACE_ENTRY();
+ CHECK_PARAMS( acct_config && acct_config->conninfo && acct_config->tablename );
+
+ CHECK_PARAMS_DO( PQisthreadsafe() == 1, {
+ fd_log_debug("You PostGreSQL installation is not thread-safe!");
+ return EINVAL;
+ } );
+
+ /* Use the information from acct_config to create the connection and prepare the query */
+ conn = PQconnectdb(acct_config->conninfo);
+
+ /* Check to see that the backend connection was successfully made */
+ if (PQstatus(conn) != CONNECTION_OK) {
+ fd_log_debug("Connection to database failed: %s", PQerrorMessage(conn));
+ acct_db_free();
+ return EINVAL;
+ }
+ if (PQprotocolVersion(conn) < 3) {
+ fd_log_debug("Database protocol version is too old, version 3 is required for prepared statements.");
+ acct_db_free();
+ return EINVAL;
+ }
+
+ TRACE_DEBUG(FULL, "Connection to database successful, server version %d.", PQserverVersion(conn));
+
+ /* Now, prepare the request object */
+
+ /* First, we build the list of AVP we will insert in the database */
+ CHECK_FCT( acct_rec_prepare(&emptyrecords) );
+
+ /* Now, prepare the text of the request */
+ CHECK_MALLOC(sql = malloc(REALLOC_SIZE));
+ sql_allocd = REALLOC_SIZE;
+
+ /* This macro hides the details of extending the buffer on each sprintf... */
+ #define ADD_EXTEND(args...) { \
+ size_t p; \
+ int loop = 0; \
+ do { \
+ p = snprintf(sql + sql_offset, sql_allocd - sql_offset, ##args); \
+ if (p >= sql_allocd - sql_offset) { \
+ /* Too short, extend the buffer and start again */ \
+ CHECK_MALLOC( sql = realloc(sql, sql_allocd + REALLOC_SIZE) ); \
+ sql_allocd += REALLOC_SIZE; \
+ loop++; \
+ ASSERT(loop < 100); /* detect infinite loops */ \
+ continue; \
+ } \
+ sql_offset += p; \
+ break; \
+ } while (1); \
+ }
+
+ /* This macro allows to add a value in the SQL buffer while escaping in properly */
+ #define ADD_ESCAPE(str) { \
+ char * __s = (char *)str; \
+ /* Check we have at least twice the size available +1 */ \
+ size_t p = strlen(__s); \
+ \
+ while (sql_allocd - sql_offset < 2 * p + 1) { \
+ /* Too short, extend the buffer */ \
+ CHECK_MALLOC( sql = realloc(sql, sql_allocd + REALLOC_SIZE) ); \
+ sql_allocd += REALLOC_SIZE; \
+ } \
+ \
+ /* Now add the escaped string */ \
+ p = PQescapeStringConn(conn, sql+sql_offset, __s, p, NULL); \
+ sql_offset += p; \
+ }
+
+ /* INSERT INTO table (tsfield, field1, field2, ...) VALUES (now, $1::bytea, $2::integer, ...) */
+ ADD_EXTEND("INSERT INTO %s (", acct_config->tablename);
+
+ if (acct_config->tsfield) {
+ ADD_EXTEND("\"");
+ ADD_ESCAPE(acct_config->tsfield);
+ ADD_EXTEND("\", ");
+ }
+
+ if (acct_config->srvnfield) {
+ ADD_EXTEND("\"");
+ ADD_ESCAPE(acct_config->srvnfield);
+ ADD_EXTEND("\", ");
+ }
+
+ for (li = emptyrecords.all.next; li != &emptyrecords.all; li = li->next) {
+ struct acct_record_item * i = (struct acct_record_item *)(li->o);
+ ADD_EXTEND("\"");
+ ADD_ESCAPE(i->param->field?:i->param->avpname);
+ if (i->index) {
+ ADD_EXTEND("%d", i->index);
+ }
+ if (li->next != &emptyrecords.all) {
+ ADD_EXTEND("\", ");
+ }
+ }
+
+ ADD_EXTEND("\") VALUES (");
+
+ if (acct_config->tsfield) {
+ ++idx;
+ ADD_EXTEND("$%d, ", idx);
+ }
+ if (acct_config->srvnfield) {
+ ADD_EXTEND("'");
+ ADD_ESCAPE(fd_g_config->cnf_diamid);
+ ADD_EXTEND("', ");
+ }
+
+ for (li = emptyrecords.all.next; li != &emptyrecords.all; li = li->next) {
+ struct acct_record_item * i = (struct acct_record_item *)(li->o);
+ ++idx;
+ ADD_EXTEND("$%d::%s", idx, diam2db_types_mapping[i->param->avptype]);
+
+ if (li->next != &emptyrecords.all) {
+ ADD_EXTEND(", ");
+ }
+ }
+
+ ADD_EXTEND(");");
+
+ TRACE_DEBUG(FULL, "Preparing the following SQL statement: '%s'", sql);
+ res = PQprepare(conn, stmt, sql, emptyrecords.nball, NULL);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK) {
+ TRACE_DEBUG(INFO, "Preparing statement '%s' failed: %s",
+ sql, PQerrorMessage(conn));
+ PQclear(res);
+ return EINVAL;
+ }
+ PQclear(res);
+ nbrecords = emptyrecords.nball;
+
+ acct_rec_empty(&emptyrecords);
+
+ CHECK_POSIX( pthread_key_create(&connk, (void (*)(void*))PQfinish) );
+ CHECK_POSIX( pthread_setspecific(connk, conn) );
+
+ /* Ok, ready */
+ return 0;
+}
+
+/* Terminate the connection to the DB */
+void acct_db_free(void)
+{
+ CHECK_POSIX_DO(pthread_key_delete(connk) , );
+ free(sql);
+}
+
+/* When a new message has been received, insert the content of the parsed mapping into the DB (using prepared statement) */
+int acct_db_insert(struct acct_record_list * records)
+{
+ char **val;
+ int *val_len;
+ int *val_isbin;
+ int idx = 0;
+ int size = 0;
+ PGresult *res;
+ struct fd_list *li;
+ PGconn *conn;
+ int new = 0;
+
+ TRACE_ENTRY("%p", records);
+ CHECK_PARAMS( records );
+
+ conn = pthread_getspecific(connk);
+ if (!conn) {
+ conn = PQconnectdb(acct_config->conninfo);
+ CHECK_POSIX( pthread_setspecific(connk, conn) );
+
+ new = 1;
+ }
+
+ /* First, check if the connection with the DB has not staled, and eventually try to fix it */
+ if (PQstatus(conn) != CONNECTION_OK) {
+ /* Attempt a reset */
+ PQreset(conn);
+ if (PQstatus(conn) != CONNECTION_OK) {
+ TRACE_DEBUG(INFO, "Lost connection to the database server, and attempt to reestablish it failed");
+ TODO("Terminate the freeDiameter instance completely?");
+ return ENOTCONN;
+ }
+ }
+
+ if (new) {
+ /* Create the prepared statement for this ocnnection, it is not shared */
+ res = PQprepare(conn, stmt, sql, nbrecords, NULL);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK) {
+ TRACE_DEBUG(INFO, "Preparing statement '%s' failed: %s",
+ sql, PQerrorMessage(conn));
+ PQclear(res);
+ return EINVAL;
+ }
+ PQclear(res);
+ }
+
+ size = acct_config->tsfield ? records->nball + 1 : records->nball;
+
+ /* Alloc the arrays of parameters */
+ CHECK_MALLOC( val = calloc(size, sizeof(const char *)) );
+ CHECK_MALLOC( val_len = calloc(size, sizeof(const int)) );
+ CHECK_MALLOC( val_isbin = calloc(size, sizeof(const int)) );
+
+ if (acct_config->tsfield) {
+ val[idx] = "now";
+ val_len[idx] = 3;
+ val_isbin[idx] = 0;
+ idx++;
+ }
+
+ /* Now write all the map'd records in these arrays */
+ for (li = records->all.next; li != &records->all; li = li->next) {
+ struct acct_record_item * r = (struct acct_record_item *)(li->o);
+ if (r->value) {
+ val_isbin[idx] = 1; /* We always pass binary parameters */
+ switch (r->param->avptype) {
+ case AVP_TYPE_OCTETSTRING:
+ val[idx] = (void *)(r->value->os.data);
+ val_len[idx] = r->value->os.len;
+ break;
+
+ case AVP_TYPE_INTEGER32:
+ case AVP_TYPE_UNSIGNED32:
+ case AVP_TYPE_FLOAT32:
+ r->scalar.v32 = htonl(r->value->u32);
+ val[idx] = &r->scalar.c;
+ val_len[idx] = sizeof(uint32_t);
+ break;
+
+ case AVP_TYPE_INTEGER64:
+ case AVP_TYPE_UNSIGNED64:
+ case AVP_TYPE_FLOAT64:
+ r->scalar.v64 = htonll(r->value->u64);
+ val[idx] = &r->scalar.c;
+ val_len[idx] = sizeof(uint64_t);
+ break;
+
+ default:
+ ASSERT(0); /* detect bugs */
+ }
+ }
+
+ idx++;
+ }
+
+ /* OK, now execute the SQL statement */
+ res = PQexecPrepared(conn, stmt, size, (const char * const *)val, val_len, val_isbin, 1 /* We actually don't care here */);
+
+ /* Done with the parameters */
+ free(val);
+ free(val_len);
+ free(val_isbin);
+
+ /* Now check the result code */
+ if (PQresultStatus(res) != PGRES_COMMAND_OK) {
+ TRACE_DEBUG(INFO, "An error occurred while INSERTing in the database: %s", PQerrorMessage(conn));
+ PQclear(res);
+ return EINVAL; /* It was probably a mistake in configuration file... */
+ }
+ PQclear(res);
+
+ /* Ok, we are done */
+ return 0;
+}
+
+