blob: a444c94e01828a924c95893b2f9eeb5c7759737d [file] [log] [blame]
/*********************************************************************************************************
* Software License Agreement (BSD License) *
* Author: Sebastien Decugis <sdecugis@freediameter.net> *
* *
* Copyright (c) 2013, WIDE Project and NICT *
* All rights reserved. *
* *
* Redistribution and use of this software in source and binary forms, with or without modification, are *
* permitted provided that the following conditions are met: *
* *
* * Redistributions of source code must retain the above *
* copyright notice, this list of conditions and the *
* following disclaimer. *
* *
* * Redistributions in binary form must reproduce the above *
* copyright notice, this list of conditions and the *
* following disclaimer in the documentation and/or other *
* materials provided with the distribution. *
* *
* * Neither the name of the WIDE Project or NICT nor the *
* names of its contributors may be used to endorse or *
* promote products derived from this software without *
* specific prior written permission of WIDE Project and *
* NICT. *
* *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS *
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF *
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *
*********************************************************************************************************/
/* Database interface module */
/* There is one connection to the db per thread.
The connection is stored in the pthread_key_t variable */
#include "app_acct.h"
#include <libpq-fe.h>
const char * diam2db_types_mapping[AVP_TYPE_MAX + 1] = {
"" /* AVP_TYPE_GROUPED */,
"bytea" /* AVP_TYPE_OCTETSTRING */,
"integer" /* AVP_TYPE_INTEGER32 */,
"bigint" /* AVP_TYPE_INTEGER64 */,
"integer" /* AVP_TYPE_UNSIGNED32 + cast */,
"bigint" /* AVP_TYPE_UNSIGNED64 + cast */,
"real" /* AVP_TYPE_FLOAT32 */,
"double precision" /* AVP_TYPE_FLOAT64 */
};
static const char * stmt = "acct_db_stmt";
#ifndef TEST_DEBUG
static
#endif /* TEST_DEBUG */
pthread_key_t connk;
static char * sql = NULL; /* The buffer that will contain the SQL query */
static int nbrecords = 0;
/* Initialize the database context: connection to the DB, prepared statement to insert new records */
int acct_db_init(void)
{
struct acct_record_list emptyrecords;
struct fd_list * li;
size_t sql_allocd = 0; /* The malloc'd size of the buffer */
size_t sql_offset = 0; /* The actual data already written in this buffer */
int idx = 0;
PGresult * res;
PGconn *conn;
#define REALLOC_SIZE 1024 /* We extend the buffer by this amount */
TRACE_ENTRY();
CHECK_PARAMS( acct_config && acct_config->conninfo && acct_config->tablename );
CHECK_PARAMS_DO( PQisthreadsafe() == 1, {
fd_log_debug("You PostGreSQL installation is not thread-safe!");
return EINVAL;
} );
/* Use the information from acct_config to create the connection and prepare the query */
conn = PQconnectdb(acct_config->conninfo);
/* Check to see that the backend connection was successfully made */
if (PQstatus(conn) != CONNECTION_OK) {
fd_log_debug("Connection to database failed: %s", PQerrorMessage(conn));
acct_db_free();
return EINVAL;
}
if (PQprotocolVersion(conn) < 3) {
fd_log_debug("Database protocol version is too old, version 3 is required for prepared statements.");
acct_db_free();
return EINVAL;
}
TRACE_DEBUG(FULL, "Connection to database successful, server version %d.", PQserverVersion(conn));
/* Now, prepare the request object */
/* First, we build the list of AVP we will insert in the database */
CHECK_FCT( acct_rec_prepare(&emptyrecords) );
/* Now, prepare the text of the request */
CHECK_MALLOC(sql = malloc(REALLOC_SIZE));
sql_allocd = REALLOC_SIZE;
/* This macro hides the details of extending the buffer on each sprintf... */
#define ADD_EXTEND(args...) { \
size_t p; \
int loop = 0; \
do { \
p = snprintf(sql + sql_offset, sql_allocd - sql_offset, ##args); \
if (p >= sql_allocd - sql_offset) { \
/* Too short, extend the buffer and start again */ \
CHECK_MALLOC( sql = realloc(sql, sql_allocd + REALLOC_SIZE) ); \
sql_allocd += REALLOC_SIZE; \
loop++; \
ASSERT(loop < 100); /* detect infinite loops */ \
continue; \
} \
sql_offset += p; \
break; \
} while (1); \
}
/* This macro allows to add a value in the SQL buffer while escaping in properly */
#define ADD_ESCAPE(str) { \
char * __s = (char *)str; \
/* Check we have at least twice the size available +1 */ \
size_t p = strlen(__s); \
\
while (sql_allocd - sql_offset < 2 * p + 1) { \
/* Too short, extend the buffer */ \
CHECK_MALLOC( sql = realloc(sql, sql_allocd + REALLOC_SIZE) ); \
sql_allocd += REALLOC_SIZE; \
} \
\
/* Now add the escaped string */ \
p = PQescapeStringConn(conn, sql+sql_offset, __s, p, NULL); \
sql_offset += p; \
}
/* INSERT INTO table (tsfield, field1, field2, ...) VALUES (now, $1::bytea, $2::integer, ...) */
ADD_EXTEND("INSERT INTO %s (", acct_config->tablename);
if (acct_config->tsfield) {
ADD_EXTEND("\"");
ADD_ESCAPE(acct_config->tsfield);
ADD_EXTEND("\", ");
}
if (acct_config->srvnfield) {
ADD_EXTEND("\"");
ADD_ESCAPE(acct_config->srvnfield);
ADD_EXTEND("\", ");
}
for (li = emptyrecords.all.next; li != &emptyrecords.all; li = li->next) {
struct acct_record_item * i = (struct acct_record_item *)(li->o);
ADD_EXTEND("\"");
ADD_ESCAPE(i->param->field?:i->param->avpname);
if (i->index) {
ADD_EXTEND("%d", i->index);
}
if (li->next != &emptyrecords.all) {
ADD_EXTEND("\", ");
}
}
ADD_EXTEND("\") VALUES (");
if (acct_config->tsfield) {
++idx;
ADD_EXTEND("$%d, ", idx);
}
if (acct_config->srvnfield) {
ADD_EXTEND("'");
ADD_ESCAPE(fd_g_config->cnf_diamid);
ADD_EXTEND("', ");
}
for (li = emptyrecords.all.next; li != &emptyrecords.all; li = li->next) {
struct acct_record_item * i = (struct acct_record_item *)(li->o);
++idx;
ADD_EXTEND("$%d::%s", idx, diam2db_types_mapping[i->param->avptype]);
if (li->next != &emptyrecords.all) {
ADD_EXTEND(", ");
}
}
ADD_EXTEND(");");
TRACE_DEBUG(FULL, "Preparing the following SQL statement: '%s'", sql);
res = PQprepare(conn, stmt, sql, emptyrecords.nball, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
TRACE_DEBUG(INFO, "Preparing statement '%s' failed: %s",
sql, PQerrorMessage(conn));
PQclear(res);
return EINVAL;
}
PQclear(res);
nbrecords = emptyrecords.nball;
acct_rec_empty(&emptyrecords);
CHECK_POSIX( pthread_key_create(&connk, (void (*)(void*))PQfinish) );
CHECK_POSIX( pthread_setspecific(connk, conn) );
/* Ok, ready */
return 0;
}
/* Terminate the connection to the DB */
void acct_db_free(void)
{
CHECK_POSIX_DO(pthread_key_delete(connk) , );
free(sql);
}
/* When a new message has been received, insert the content of the parsed mapping into the DB (using prepared statement) */
int acct_db_insert(struct acct_record_list * records)
{
char **val;
int *val_len;
int *val_isbin;
int idx = 0;
int size = 0;
PGresult *res;
struct fd_list *li;
PGconn *conn;
int new = 0;
TRACE_ENTRY("%p", records);
CHECK_PARAMS( records );
conn = pthread_getspecific(connk);
if (!conn) {
conn = PQconnectdb(acct_config->conninfo);
CHECK_POSIX( pthread_setspecific(connk, conn) );
new = 1;
}
/* First, check if the connection with the DB has not staled, and eventually try to fix it */
if (PQstatus(conn) != CONNECTION_OK) {
/* Attempt a reset */
PQreset(conn);
if (PQstatus(conn) != CONNECTION_OK) {
TRACE_DEBUG(INFO, "Lost connection to the database server, and attempt to reestablish it failed");
TODO("Terminate the freeDiameter instance completely?");
return ENOTCONN;
}
}
if (new) {
/* Create the prepared statement for this ocnnection, it is not shared */
res = PQprepare(conn, stmt, sql, nbrecords, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
TRACE_DEBUG(INFO, "Preparing statement '%s' failed: %s",
sql, PQerrorMessage(conn));
PQclear(res);
return EINVAL;
}
PQclear(res);
}
size = acct_config->tsfield ? records->nball + 1 : records->nball;
/* Alloc the arrays of parameters */
CHECK_MALLOC( val = calloc(size, sizeof(const char *)) );
CHECK_MALLOC( val_len = calloc(size, sizeof(const int)) );
CHECK_MALLOC( val_isbin = calloc(size, sizeof(const int)) );
if (acct_config->tsfield) {
val[idx] = "now";
val_len[idx] = 3;
val_isbin[idx] = 0;
idx++;
}
/* Now write all the map'd records in these arrays */
for (li = records->all.next; li != &records->all; li = li->next) {
struct acct_record_item * r = (struct acct_record_item *)(li->o);
if (r->value) {
val_isbin[idx] = 1; /* We always pass binary parameters */
switch (r->param->avptype) {
case AVP_TYPE_OCTETSTRING:
val[idx] = (void *)(r->value->os.data);
val_len[idx] = r->value->os.len;
break;
case AVP_TYPE_INTEGER32:
case AVP_TYPE_UNSIGNED32:
case AVP_TYPE_FLOAT32:
r->scalar.v32 = htonl(r->value->u32);
val[idx] = &r->scalar.c;
val_len[idx] = sizeof(uint32_t);
break;
case AVP_TYPE_INTEGER64:
case AVP_TYPE_UNSIGNED64:
case AVP_TYPE_FLOAT64:
r->scalar.v64 = htonll(r->value->u64);
val[idx] = &r->scalar.c;
val_len[idx] = sizeof(uint64_t);
break;
default:
ASSERT(0); /* detect bugs */
}
}
idx++;
}
/* OK, now execute the SQL statement */
res = PQexecPrepared(conn, stmt, size, (const char * const *)val, val_len, val_isbin, 1 /* We actually don't care here */);
/* Done with the parameters */
free(val);
free(val_len);
free(val_isbin);
/* Now check the result code */
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
TRACE_DEBUG(INFO, "An error occurred while INSERTing in the database: %s", PQerrorMessage(conn));
PQclear(res);
return EINVAL; /* It was probably a mistake in configuration file... */
}
PQclear(res);
/* Ok, we are done */
return 0;
}