Brian Waters | 13d9601 | 2017-12-08 16:53:31 -0600 | [diff] [blame] | 1 | /********************************************************************************************************* |
| 2 | * Software License Agreement (BSD License) * |
| 3 | * Author: Sebastien Decugis <sdecugis@freediameter.net> * |
| 4 | * * |
| 5 | * Copyright (c) 2013, WIDE Project and NICT * |
| 6 | * All rights reserved. * |
| 7 | * * |
| 8 | * Redistribution and use of this software in source and binary forms, with or without modification, are * |
| 9 | * permitted provided that the following conditions are met: * |
| 10 | * * |
| 11 | * * Redistributions of source code must retain the above * |
| 12 | * copyright notice, this list of conditions and the * |
| 13 | * following disclaimer. * |
| 14 | * * |
| 15 | * * Redistributions in binary form must reproduce the above * |
| 16 | * copyright notice, this list of conditions and the * |
| 17 | * following disclaimer in the documentation and/or other * |
| 18 | * materials provided with the distribution. * |
| 19 | * * |
| 20 | * * Neither the name of the WIDE Project or NICT nor the * |
| 21 | * names of its contributors may be used to endorse or * |
| 22 | * promote products derived from this software without * |
| 23 | * specific prior written permission of WIDE Project and * |
| 24 | * NICT. * |
| 25 | * * |
| 26 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED * |
| 27 | * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A * |
| 28 | * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR * |
| 29 | * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * |
| 30 | * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * |
| 31 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR * |
| 32 | * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * |
| 33 | * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * |
| 34 | *********************************************************************************************************/ |
| 35 | |
| 36 | /* Database interface module */ |
| 37 | |
| 38 | /* There is one connection to the db per thread. |
| 39 | The connection is stored in the pthread_key_t variable */ |
| 40 | |
| 41 | |
| 42 | #include "app_acct.h" |
| 43 | #include <libpq-fe.h> |
| 44 | |
| 45 | const char * diam2db_types_mapping[AVP_TYPE_MAX + 1] = { |
| 46 | "" /* AVP_TYPE_GROUPED */, |
| 47 | "bytea" /* AVP_TYPE_OCTETSTRING */, |
| 48 | "integer" /* AVP_TYPE_INTEGER32 */, |
| 49 | "bigint" /* AVP_TYPE_INTEGER64 */, |
| 50 | "integer" /* AVP_TYPE_UNSIGNED32 + cast */, |
| 51 | "bigint" /* AVP_TYPE_UNSIGNED64 + cast */, |
| 52 | "real" /* AVP_TYPE_FLOAT32 */, |
| 53 | "double precision" /* AVP_TYPE_FLOAT64 */ |
| 54 | }; |
| 55 | |
| 56 | static const char * stmt = "acct_db_stmt"; |
| 57 | #ifndef TEST_DEBUG |
| 58 | static |
| 59 | #endif /* TEST_DEBUG */ |
| 60 | pthread_key_t connk; |
| 61 | static char * sql = NULL; /* The buffer that will contain the SQL query */ |
| 62 | static int nbrecords = 0; |
| 63 | |
| 64 | |
| 65 | /* Initialize the database context: connection to the DB, prepared statement to insert new records */ |
| 66 | int acct_db_init(void) |
| 67 | { |
| 68 | struct acct_record_list emptyrecords; |
| 69 | struct fd_list * li; |
| 70 | size_t sql_allocd = 0; /* The malloc'd size of the buffer */ |
| 71 | size_t sql_offset = 0; /* The actual data already written in this buffer */ |
| 72 | int idx = 0; |
| 73 | PGresult * res; |
| 74 | PGconn *conn; |
| 75 | #define REALLOC_SIZE 1024 /* We extend the buffer by this amount */ |
| 76 | |
| 77 | TRACE_ENTRY(); |
| 78 | CHECK_PARAMS( acct_config && acct_config->conninfo && acct_config->tablename ); |
| 79 | |
| 80 | CHECK_PARAMS_DO( PQisthreadsafe() == 1, { |
| 81 | fd_log_debug("You PostGreSQL installation is not thread-safe!"); |
| 82 | return EINVAL; |
| 83 | } ); |
| 84 | |
| 85 | /* Use the information from acct_config to create the connection and prepare the query */ |
| 86 | conn = PQconnectdb(acct_config->conninfo); |
| 87 | |
| 88 | /* Check to see that the backend connection was successfully made */ |
| 89 | if (PQstatus(conn) != CONNECTION_OK) { |
| 90 | fd_log_debug("Connection to database failed: %s", PQerrorMessage(conn)); |
| 91 | acct_db_free(); |
| 92 | return EINVAL; |
| 93 | } |
| 94 | if (PQprotocolVersion(conn) < 3) { |
| 95 | fd_log_debug("Database protocol version is too old, version 3 is required for prepared statements."); |
| 96 | acct_db_free(); |
| 97 | return EINVAL; |
| 98 | } |
| 99 | |
| 100 | TRACE_DEBUG(FULL, "Connection to database successful, server version %d.", PQserverVersion(conn)); |
| 101 | |
| 102 | /* Now, prepare the request object */ |
| 103 | |
| 104 | /* First, we build the list of AVP we will insert in the database */ |
| 105 | CHECK_FCT( acct_rec_prepare(&emptyrecords) ); |
| 106 | |
| 107 | /* Now, prepare the text of the request */ |
| 108 | CHECK_MALLOC(sql = malloc(REALLOC_SIZE)); |
| 109 | sql_allocd = REALLOC_SIZE; |
| 110 | |
| 111 | /* This macro hides the details of extending the buffer on each sprintf... */ |
| 112 | #define ADD_EXTEND(args...) { \ |
| 113 | size_t p; \ |
| 114 | int loop = 0; \ |
| 115 | do { \ |
| 116 | p = snprintf(sql + sql_offset, sql_allocd - sql_offset, ##args); \ |
| 117 | if (p >= sql_allocd - sql_offset) { \ |
| 118 | /* Too short, extend the buffer and start again */ \ |
| 119 | CHECK_MALLOC( sql = realloc(sql, sql_allocd + REALLOC_SIZE) ); \ |
| 120 | sql_allocd += REALLOC_SIZE; \ |
| 121 | loop++; \ |
| 122 | ASSERT(loop < 100); /* detect infinite loops */ \ |
| 123 | continue; \ |
| 124 | } \ |
| 125 | sql_offset += p; \ |
| 126 | break; \ |
| 127 | } while (1); \ |
| 128 | } |
| 129 | |
| 130 | /* This macro allows to add a value in the SQL buffer while escaping in properly */ |
| 131 | #define ADD_ESCAPE(str) { \ |
| 132 | char * __s = (char *)str; \ |
| 133 | /* Check we have at least twice the size available +1 */ \ |
| 134 | size_t p = strlen(__s); \ |
| 135 | \ |
| 136 | while (sql_allocd - sql_offset < 2 * p + 1) { \ |
| 137 | /* Too short, extend the buffer */ \ |
| 138 | CHECK_MALLOC( sql = realloc(sql, sql_allocd + REALLOC_SIZE) ); \ |
| 139 | sql_allocd += REALLOC_SIZE; \ |
| 140 | } \ |
| 141 | \ |
| 142 | /* Now add the escaped string */ \ |
| 143 | p = PQescapeStringConn(conn, sql+sql_offset, __s, p, NULL); \ |
| 144 | sql_offset += p; \ |
| 145 | } |
| 146 | |
| 147 | /* INSERT INTO table (tsfield, field1, field2, ...) VALUES (now, $1::bytea, $2::integer, ...) */ |
| 148 | ADD_EXTEND("INSERT INTO %s (", acct_config->tablename); |
| 149 | |
| 150 | if (acct_config->tsfield) { |
| 151 | ADD_EXTEND("\""); |
| 152 | ADD_ESCAPE(acct_config->tsfield); |
| 153 | ADD_EXTEND("\", "); |
| 154 | } |
| 155 | |
| 156 | if (acct_config->srvnfield) { |
| 157 | ADD_EXTEND("\""); |
| 158 | ADD_ESCAPE(acct_config->srvnfield); |
| 159 | ADD_EXTEND("\", "); |
| 160 | } |
| 161 | |
| 162 | for (li = emptyrecords.all.next; li != &emptyrecords.all; li = li->next) { |
| 163 | struct acct_record_item * i = (struct acct_record_item *)(li->o); |
| 164 | ADD_EXTEND("\""); |
| 165 | ADD_ESCAPE(i->param->field?:i->param->avpname); |
| 166 | if (i->index) { |
| 167 | ADD_EXTEND("%d", i->index); |
| 168 | } |
| 169 | if (li->next != &emptyrecords.all) { |
| 170 | ADD_EXTEND("\", "); |
| 171 | } |
| 172 | } |
| 173 | |
| 174 | ADD_EXTEND("\") VALUES ("); |
| 175 | |
| 176 | if (acct_config->tsfield) { |
| 177 | ++idx; |
| 178 | ADD_EXTEND("$%d, ", idx); |
| 179 | } |
| 180 | if (acct_config->srvnfield) { |
| 181 | ADD_EXTEND("'"); |
| 182 | ADD_ESCAPE(fd_g_config->cnf_diamid); |
| 183 | ADD_EXTEND("', "); |
| 184 | } |
| 185 | |
| 186 | for (li = emptyrecords.all.next; li != &emptyrecords.all; li = li->next) { |
| 187 | struct acct_record_item * i = (struct acct_record_item *)(li->o); |
| 188 | ++idx; |
| 189 | ADD_EXTEND("$%d::%s", idx, diam2db_types_mapping[i->param->avptype]); |
| 190 | |
| 191 | if (li->next != &emptyrecords.all) { |
| 192 | ADD_EXTEND(", "); |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | ADD_EXTEND(");"); |
| 197 | |
| 198 | TRACE_DEBUG(FULL, "Preparing the following SQL statement: '%s'", sql); |
| 199 | res = PQprepare(conn, stmt, sql, emptyrecords.nball, NULL); |
| 200 | if (PQresultStatus(res) != PGRES_COMMAND_OK) { |
| 201 | TRACE_DEBUG(INFO, "Preparing statement '%s' failed: %s", |
| 202 | sql, PQerrorMessage(conn)); |
| 203 | PQclear(res); |
| 204 | return EINVAL; |
| 205 | } |
| 206 | PQclear(res); |
| 207 | nbrecords = emptyrecords.nball; |
| 208 | |
| 209 | acct_rec_empty(&emptyrecords); |
| 210 | |
| 211 | CHECK_POSIX( pthread_key_create(&connk, (void (*)(void*))PQfinish) ); |
| 212 | CHECK_POSIX( pthread_setspecific(connk, conn) ); |
| 213 | |
| 214 | /* Ok, ready */ |
| 215 | return 0; |
| 216 | } |
| 217 | |
| 218 | /* Terminate the connection to the DB */ |
| 219 | void acct_db_free(void) |
| 220 | { |
| 221 | CHECK_POSIX_DO(pthread_key_delete(connk) , ); |
| 222 | free(sql); |
| 223 | } |
| 224 | |
| 225 | /* When a new message has been received, insert the content of the parsed mapping into the DB (using prepared statement) */ |
| 226 | int acct_db_insert(struct acct_record_list * records) |
| 227 | { |
| 228 | char **val; |
| 229 | int *val_len; |
| 230 | int *val_isbin; |
| 231 | int idx = 0; |
| 232 | int size = 0; |
| 233 | PGresult *res; |
| 234 | struct fd_list *li; |
| 235 | PGconn *conn; |
| 236 | int new = 0; |
| 237 | |
| 238 | TRACE_ENTRY("%p", records); |
| 239 | CHECK_PARAMS( records ); |
| 240 | |
| 241 | conn = pthread_getspecific(connk); |
| 242 | if (!conn) { |
| 243 | conn = PQconnectdb(acct_config->conninfo); |
| 244 | CHECK_POSIX( pthread_setspecific(connk, conn) ); |
| 245 | |
| 246 | new = 1; |
| 247 | } |
| 248 | |
| 249 | /* First, check if the connection with the DB has not staled, and eventually try to fix it */ |
| 250 | if (PQstatus(conn) != CONNECTION_OK) { |
| 251 | /* Attempt a reset */ |
| 252 | PQreset(conn); |
| 253 | if (PQstatus(conn) != CONNECTION_OK) { |
| 254 | TRACE_DEBUG(INFO, "Lost connection to the database server, and attempt to reestablish it failed"); |
| 255 | TODO("Terminate the freeDiameter instance completely?"); |
| 256 | return ENOTCONN; |
| 257 | } |
| 258 | } |
| 259 | |
| 260 | if (new) { |
| 261 | /* Create the prepared statement for this ocnnection, it is not shared */ |
| 262 | res = PQprepare(conn, stmt, sql, nbrecords, NULL); |
| 263 | if (PQresultStatus(res) != PGRES_COMMAND_OK) { |
| 264 | TRACE_DEBUG(INFO, "Preparing statement '%s' failed: %s", |
| 265 | sql, PQerrorMessage(conn)); |
| 266 | PQclear(res); |
| 267 | return EINVAL; |
| 268 | } |
| 269 | PQclear(res); |
| 270 | } |
| 271 | |
| 272 | size = acct_config->tsfield ? records->nball + 1 : records->nball; |
| 273 | |
| 274 | /* Alloc the arrays of parameters */ |
| 275 | CHECK_MALLOC( val = calloc(size, sizeof(const char *)) ); |
| 276 | CHECK_MALLOC( val_len = calloc(size, sizeof(const int)) ); |
| 277 | CHECK_MALLOC( val_isbin = calloc(size, sizeof(const int)) ); |
| 278 | |
| 279 | if (acct_config->tsfield) { |
| 280 | val[idx] = "now"; |
| 281 | val_len[idx] = 3; |
| 282 | val_isbin[idx] = 0; |
| 283 | idx++; |
| 284 | } |
| 285 | |
| 286 | /* Now write all the map'd records in these arrays */ |
| 287 | for (li = records->all.next; li != &records->all; li = li->next) { |
| 288 | struct acct_record_item * r = (struct acct_record_item *)(li->o); |
| 289 | if (r->value) { |
| 290 | val_isbin[idx] = 1; /* We always pass binary parameters */ |
| 291 | switch (r->param->avptype) { |
| 292 | case AVP_TYPE_OCTETSTRING: |
| 293 | val[idx] = (void *)(r->value->os.data); |
| 294 | val_len[idx] = r->value->os.len; |
| 295 | break; |
| 296 | |
| 297 | case AVP_TYPE_INTEGER32: |
| 298 | case AVP_TYPE_UNSIGNED32: |
| 299 | case AVP_TYPE_FLOAT32: |
| 300 | r->scalar.v32 = htonl(r->value->u32); |
| 301 | val[idx] = &r->scalar.c; |
| 302 | val_len[idx] = sizeof(uint32_t); |
| 303 | break; |
| 304 | |
| 305 | case AVP_TYPE_INTEGER64: |
| 306 | case AVP_TYPE_UNSIGNED64: |
| 307 | case AVP_TYPE_FLOAT64: |
| 308 | r->scalar.v64 = htonll(r->value->u64); |
| 309 | val[idx] = &r->scalar.c; |
| 310 | val_len[idx] = sizeof(uint64_t); |
| 311 | break; |
| 312 | |
| 313 | default: |
| 314 | ASSERT(0); /* detect bugs */ |
| 315 | } |
| 316 | } |
| 317 | |
| 318 | idx++; |
| 319 | } |
| 320 | |
| 321 | /* OK, now execute the SQL statement */ |
| 322 | res = PQexecPrepared(conn, stmt, size, (const char * const *)val, val_len, val_isbin, 1 /* We actually don't care here */); |
| 323 | |
| 324 | /* Done with the parameters */ |
| 325 | free(val); |
| 326 | free(val_len); |
| 327 | free(val_isbin); |
| 328 | |
| 329 | /* Now check the result code */ |
| 330 | if (PQresultStatus(res) != PGRES_COMMAND_OK) { |
| 331 | TRACE_DEBUG(INFO, "An error occurred while INSERTing in the database: %s", PQerrorMessage(conn)); |
| 332 | PQclear(res); |
| 333 | return EINVAL; /* It was probably a mistake in configuration file... */ |
| 334 | } |
| 335 | PQclear(res); |
| 336 | |
| 337 | /* Ok, we are done */ |
| 338 | return 0; |
| 339 | } |
| 340 | |
| 341 | |