blob: a444c94e01828a924c95893b2f9eeb5c7759737d [file] [log] [blame]
Brian Waters13d96012017-12-08 16:53:31 -06001/*********************************************************************************************************
2* Software License Agreement (BSD License) *
3* Author: Sebastien Decugis <sdecugis@freediameter.net> *
4* *
5* Copyright (c) 2013, WIDE Project and NICT *
6* All rights reserved. *
7* *
8* Redistribution and use of this software in source and binary forms, with or without modification, are *
9* permitted provided that the following conditions are met: *
10* *
11* * Redistributions of source code must retain the above *
12* copyright notice, this list of conditions and the *
13* following disclaimer. *
14* *
15* * Redistributions in binary form must reproduce the above *
16* copyright notice, this list of conditions and the *
17* following disclaimer in the documentation and/or other *
18* materials provided with the distribution. *
19* *
20* * Neither the name of the WIDE Project or NICT nor the *
21* names of its contributors may be used to endorse or *
22* promote products derived from this software without *
23* specific prior written permission of WIDE Project and *
24* NICT. *
25* *
26* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
27* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
28* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
29* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *
30* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS *
31* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
32* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF *
33* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *
34*********************************************************************************************************/
35
36/* Database interface module */
37
38/* There is one connection to the db per thread.
39The connection is stored in the pthread_key_t variable */
40
41
42#include "app_acct.h"
43#include <libpq-fe.h>
44
45const char * diam2db_types_mapping[AVP_TYPE_MAX + 1] = {
46 "" /* AVP_TYPE_GROUPED */,
47 "bytea" /* AVP_TYPE_OCTETSTRING */,
48 "integer" /* AVP_TYPE_INTEGER32 */,
49 "bigint" /* AVP_TYPE_INTEGER64 */,
50 "integer" /* AVP_TYPE_UNSIGNED32 + cast */,
51 "bigint" /* AVP_TYPE_UNSIGNED64 + cast */,
52 "real" /* AVP_TYPE_FLOAT32 */,
53 "double precision" /* AVP_TYPE_FLOAT64 */
54};
55
56static const char * stmt = "acct_db_stmt";
57#ifndef TEST_DEBUG
58static
59#endif /* TEST_DEBUG */
60pthread_key_t connk;
61static char * sql = NULL; /* The buffer that will contain the SQL query */
62static int nbrecords = 0;
63
64
65/* Initialize the database context: connection to the DB, prepared statement to insert new records */
66int acct_db_init(void)
67{
68 struct acct_record_list emptyrecords;
69 struct fd_list * li;
70 size_t sql_allocd = 0; /* The malloc'd size of the buffer */
71 size_t sql_offset = 0; /* The actual data already written in this buffer */
72 int idx = 0;
73 PGresult * res;
74 PGconn *conn;
75 #define REALLOC_SIZE 1024 /* We extend the buffer by this amount */
76
77 TRACE_ENTRY();
78 CHECK_PARAMS( acct_config && acct_config->conninfo && acct_config->tablename );
79
80 CHECK_PARAMS_DO( PQisthreadsafe() == 1, {
81 fd_log_debug("You PostGreSQL installation is not thread-safe!");
82 return EINVAL;
83 } );
84
85 /* Use the information from acct_config to create the connection and prepare the query */
86 conn = PQconnectdb(acct_config->conninfo);
87
88 /* Check to see that the backend connection was successfully made */
89 if (PQstatus(conn) != CONNECTION_OK) {
90 fd_log_debug("Connection to database failed: %s", PQerrorMessage(conn));
91 acct_db_free();
92 return EINVAL;
93 }
94 if (PQprotocolVersion(conn) < 3) {
95 fd_log_debug("Database protocol version is too old, version 3 is required for prepared statements.");
96 acct_db_free();
97 return EINVAL;
98 }
99
100 TRACE_DEBUG(FULL, "Connection to database successful, server version %d.", PQserverVersion(conn));
101
102 /* Now, prepare the request object */
103
104 /* First, we build the list of AVP we will insert in the database */
105 CHECK_FCT( acct_rec_prepare(&emptyrecords) );
106
107 /* Now, prepare the text of the request */
108 CHECK_MALLOC(sql = malloc(REALLOC_SIZE));
109 sql_allocd = REALLOC_SIZE;
110
111 /* This macro hides the details of extending the buffer on each sprintf... */
112 #define ADD_EXTEND(args...) { \
113 size_t p; \
114 int loop = 0; \
115 do { \
116 p = snprintf(sql + sql_offset, sql_allocd - sql_offset, ##args); \
117 if (p >= sql_allocd - sql_offset) { \
118 /* Too short, extend the buffer and start again */ \
119 CHECK_MALLOC( sql = realloc(sql, sql_allocd + REALLOC_SIZE) ); \
120 sql_allocd += REALLOC_SIZE; \
121 loop++; \
122 ASSERT(loop < 100); /* detect infinite loops */ \
123 continue; \
124 } \
125 sql_offset += p; \
126 break; \
127 } while (1); \
128 }
129
130 /* This macro allows to add a value in the SQL buffer while escaping in properly */
131 #define ADD_ESCAPE(str) { \
132 char * __s = (char *)str; \
133 /* Check we have at least twice the size available +1 */ \
134 size_t p = strlen(__s); \
135 \
136 while (sql_allocd - sql_offset < 2 * p + 1) { \
137 /* Too short, extend the buffer */ \
138 CHECK_MALLOC( sql = realloc(sql, sql_allocd + REALLOC_SIZE) ); \
139 sql_allocd += REALLOC_SIZE; \
140 } \
141 \
142 /* Now add the escaped string */ \
143 p = PQescapeStringConn(conn, sql+sql_offset, __s, p, NULL); \
144 sql_offset += p; \
145 }
146
147 /* INSERT INTO table (tsfield, field1, field2, ...) VALUES (now, $1::bytea, $2::integer, ...) */
148 ADD_EXTEND("INSERT INTO %s (", acct_config->tablename);
149
150 if (acct_config->tsfield) {
151 ADD_EXTEND("\"");
152 ADD_ESCAPE(acct_config->tsfield);
153 ADD_EXTEND("\", ");
154 }
155
156 if (acct_config->srvnfield) {
157 ADD_EXTEND("\"");
158 ADD_ESCAPE(acct_config->srvnfield);
159 ADD_EXTEND("\", ");
160 }
161
162 for (li = emptyrecords.all.next; li != &emptyrecords.all; li = li->next) {
163 struct acct_record_item * i = (struct acct_record_item *)(li->o);
164 ADD_EXTEND("\"");
165 ADD_ESCAPE(i->param->field?:i->param->avpname);
166 if (i->index) {
167 ADD_EXTEND("%d", i->index);
168 }
169 if (li->next != &emptyrecords.all) {
170 ADD_EXTEND("\", ");
171 }
172 }
173
174 ADD_EXTEND("\") VALUES (");
175
176 if (acct_config->tsfield) {
177 ++idx;
178 ADD_EXTEND("$%d, ", idx);
179 }
180 if (acct_config->srvnfield) {
181 ADD_EXTEND("'");
182 ADD_ESCAPE(fd_g_config->cnf_diamid);
183 ADD_EXTEND("', ");
184 }
185
186 for (li = emptyrecords.all.next; li != &emptyrecords.all; li = li->next) {
187 struct acct_record_item * i = (struct acct_record_item *)(li->o);
188 ++idx;
189 ADD_EXTEND("$%d::%s", idx, diam2db_types_mapping[i->param->avptype]);
190
191 if (li->next != &emptyrecords.all) {
192 ADD_EXTEND(", ");
193 }
194 }
195
196 ADD_EXTEND(");");
197
198 TRACE_DEBUG(FULL, "Preparing the following SQL statement: '%s'", sql);
199 res = PQprepare(conn, stmt, sql, emptyrecords.nball, NULL);
200 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
201 TRACE_DEBUG(INFO, "Preparing statement '%s' failed: %s",
202 sql, PQerrorMessage(conn));
203 PQclear(res);
204 return EINVAL;
205 }
206 PQclear(res);
207 nbrecords = emptyrecords.nball;
208
209 acct_rec_empty(&emptyrecords);
210
211 CHECK_POSIX( pthread_key_create(&connk, (void (*)(void*))PQfinish) );
212 CHECK_POSIX( pthread_setspecific(connk, conn) );
213
214 /* Ok, ready */
215 return 0;
216}
217
218/* Terminate the connection to the DB */
219void acct_db_free(void)
220{
221 CHECK_POSIX_DO(pthread_key_delete(connk) , );
222 free(sql);
223}
224
225/* When a new message has been received, insert the content of the parsed mapping into the DB (using prepared statement) */
226int acct_db_insert(struct acct_record_list * records)
227{
228 char **val;
229 int *val_len;
230 int *val_isbin;
231 int idx = 0;
232 int size = 0;
233 PGresult *res;
234 struct fd_list *li;
235 PGconn *conn;
236 int new = 0;
237
238 TRACE_ENTRY("%p", records);
239 CHECK_PARAMS( records );
240
241 conn = pthread_getspecific(connk);
242 if (!conn) {
243 conn = PQconnectdb(acct_config->conninfo);
244 CHECK_POSIX( pthread_setspecific(connk, conn) );
245
246 new = 1;
247 }
248
249 /* First, check if the connection with the DB has not staled, and eventually try to fix it */
250 if (PQstatus(conn) != CONNECTION_OK) {
251 /* Attempt a reset */
252 PQreset(conn);
253 if (PQstatus(conn) != CONNECTION_OK) {
254 TRACE_DEBUG(INFO, "Lost connection to the database server, and attempt to reestablish it failed");
255 TODO("Terminate the freeDiameter instance completely?");
256 return ENOTCONN;
257 }
258 }
259
260 if (new) {
261 /* Create the prepared statement for this ocnnection, it is not shared */
262 res = PQprepare(conn, stmt, sql, nbrecords, NULL);
263 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
264 TRACE_DEBUG(INFO, "Preparing statement '%s' failed: %s",
265 sql, PQerrorMessage(conn));
266 PQclear(res);
267 return EINVAL;
268 }
269 PQclear(res);
270 }
271
272 size = acct_config->tsfield ? records->nball + 1 : records->nball;
273
274 /* Alloc the arrays of parameters */
275 CHECK_MALLOC( val = calloc(size, sizeof(const char *)) );
276 CHECK_MALLOC( val_len = calloc(size, sizeof(const int)) );
277 CHECK_MALLOC( val_isbin = calloc(size, sizeof(const int)) );
278
279 if (acct_config->tsfield) {
280 val[idx] = "now";
281 val_len[idx] = 3;
282 val_isbin[idx] = 0;
283 idx++;
284 }
285
286 /* Now write all the map'd records in these arrays */
287 for (li = records->all.next; li != &records->all; li = li->next) {
288 struct acct_record_item * r = (struct acct_record_item *)(li->o);
289 if (r->value) {
290 val_isbin[idx] = 1; /* We always pass binary parameters */
291 switch (r->param->avptype) {
292 case AVP_TYPE_OCTETSTRING:
293 val[idx] = (void *)(r->value->os.data);
294 val_len[idx] = r->value->os.len;
295 break;
296
297 case AVP_TYPE_INTEGER32:
298 case AVP_TYPE_UNSIGNED32:
299 case AVP_TYPE_FLOAT32:
300 r->scalar.v32 = htonl(r->value->u32);
301 val[idx] = &r->scalar.c;
302 val_len[idx] = sizeof(uint32_t);
303 break;
304
305 case AVP_TYPE_INTEGER64:
306 case AVP_TYPE_UNSIGNED64:
307 case AVP_TYPE_FLOAT64:
308 r->scalar.v64 = htonll(r->value->u64);
309 val[idx] = &r->scalar.c;
310 val_len[idx] = sizeof(uint64_t);
311 break;
312
313 default:
314 ASSERT(0); /* detect bugs */
315 }
316 }
317
318 idx++;
319 }
320
321 /* OK, now execute the SQL statement */
322 res = PQexecPrepared(conn, stmt, size, (const char * const *)val, val_len, val_isbin, 1 /* We actually don't care here */);
323
324 /* Done with the parameters */
325 free(val);
326 free(val_len);
327 free(val_isbin);
328
329 /* Now check the result code */
330 if (PQresultStatus(res) != PGRES_COMMAND_OK) {
331 TRACE_DEBUG(INFO, "An error occurred while INSERTing in the database: %s", PQerrorMessage(conn));
332 PQclear(res);
333 return EINVAL; /* It was probably a mistake in configuration file... */
334 }
335 PQclear(res);
336
337 /* Ok, we are done */
338 return 0;
339}
340
341