pg: Statement execution + better error reporting

This commit is contained in:
Yorhel 2025-02-06 09:05:05 +01:00
parent 922f19e39b
commit 711300b227
6 changed files with 124 additions and 26 deletions

4
FU.xs
View file

@ -103,6 +103,10 @@ void columns(fupg_st *st)
CODE: CODE:
ST(0) = fupg_st_columns(aTHX_ st); ST(0) = fupg_st_columns(aTHX_ st);
void exec(fupg_st *st)
CODE:
ST(0) = fupg_st_exec(aTHX_ st);
void DESTROY(fupg_st *st) void DESTROY(fupg_st *st)
CODE: CODE:
fupg_st_destroy(st); fupg_st_destroy(st);

View file

@ -9,9 +9,7 @@ package FU::PG::conn {
}; };
package FU::PG::error { package FU::PG::error {
use overload '""' => sub($e, @) { use overload '""' => sub($e, @) { $e->{full_message} };
$e->{verbose_message} || "$e->{severity}: $e->{message}\n";
};
} }
1; 1;
@ -100,9 +98,9 @@ of bind parameters. C<$sql> can only hold a single statement.
Parameters can be referenced from C<$sql> with numbered placeholders, where Parameters can be referenced from C<$sql> with numbered placeholders, where
C<$1> refers to the first parameter, C<$2> to the second, etc. Be careful to C<$1> refers to the first parameter, C<$2> to the second, etc. Be careful to
not accidentally interpolate perl's C<$1> and C<$2>. Using a question mark for not accidentally interpolate perl's C<$1> and C<$2>. Using a question mark for
placeholders, as is common with L<DBI>, is not supported. If a placeholder placeholders, as is common with L<DBI>, is not supported. An error is thrown
mentioned in C<$sql> is not present in C<@params>, I<NULL> is assumed instead. when attempting to execute a query where the number of C<@params> does not
Excess C<@params> that are not referenced by C<$sql> are ignored. match the number of placeholders in C<$sql>.
Note that this method just creates a statement object, the given query is not Note that this method just creates a statement object, the given query is not
prepared or executed until the appropriate statement methods (see below) are prepared or executed until the appropriate statement methods (see below) are

View file

@ -9,6 +9,35 @@ static SV *fupg_selfobj_(pTHX_ SV **self, void *obj, const char *klass) {
/* Return an SV to use for croak_sv() with a HV object.
* Adds a "full_message" field including stack trace. */
__attribute__((format (printf, 3, 4)))
static SV *fu_croak_hv(HV *hv, const char *klass, const char *message, ...) {
va_list args;
SV *sv;
dTHX;
dSP;
va_start(args, message);
sv = vnewSVpvf(message, &args);
va_end(args);
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(sv_2mortal(sv));
PUTBACK;
call_pv("Carp::longmess", G_SCALAR);
hv_stores(hv, "full_message", SvREFCNT_inc(POPs));
FREETMPS;
LEAVE;
return sv_bless(sv_2mortal(newRV_noinc((SV *)hv)), gv_stashpv(klass, GV_ADD));
}
/* Custom string builder, should be slightly faster than using Sv* macros directly. */ /* Custom string builder, should be slightly faster than using Sv* macros directly. */
typedef struct { typedef struct {

View file

@ -41,6 +41,7 @@ typedef enum { PQSHOW_CONTEXT_NEVER, PQSHOW_CONTEXT_ERRORS, PQSHOW_CONTEXT_ALWAY
X(PQdescribePrepared, PGresult *, PGconn *, const char *) \ X(PQdescribePrepared, PGresult *, PGconn *, const char *) \
X(PQerrorMessage, char *, const PGconn *) \ X(PQerrorMessage, char *, const PGconn *) \
X(PQexec, PGresult *, PGconn *, const char *) \ X(PQexec, PGresult *, PGconn *, const char *) \
X(PQexecPrepared, PGresult *, PGconn *, const char *, int, const char * const *, const int *, const int *, int) \
X(PQfinish, void, PGconn *) \ X(PQfinish, void, PGconn *) \
X(PQfmod, int, const PGresult *, int) \ X(PQfmod, int, const PGresult *, int) \
X(PQfname, char *, const PGresult *, int) \ X(PQfname, char *, const PGresult *, int) \

View file

@ -11,7 +11,7 @@ static SV *fupg_conn_errsv(PGconn *conn, const char *action) {
hv_stores(hv, "action", newSVpv(action, 0)); hv_stores(hv, "action", newSVpv(action, 0));
hv_stores(hv, "severity", newSVpvs("FATAL")); /* Connection-related errors are always fatal */ hv_stores(hv, "severity", newSVpvs("FATAL")); /* Connection-related errors are always fatal */
hv_stores(hv, "message", newSVpv(PQerrorMessage(conn), 0)); hv_stores(hv, "message", newSVpv(PQerrorMessage(conn), 0));
return sv_bless(sv_2mortal(newRV_noinc((SV *)hv)), gv_stashpvs("FU::PG::error", GV_ADD)); return fu_croak_hv(hv, "FU::PG::error", "FATAL: %s", PQerrorMessage(conn));
} }
static void fupg_conn_croak(fupg_conn *c, const char *action) { static void fupg_conn_croak(fupg_conn *c, const char *action) {
@ -23,11 +23,10 @@ static void fupg_conn_croak(fupg_conn *c, const char *action) {
static void fupg_result_croak(PGresult *r, const char *action, const char *query) { static void fupg_result_croak(PGresult *r, const char *action, const char *query) {
dTHX; dTHX;
HV *hv = newHV(); HV *hv = newHV();
char *s;
hv_stores(hv, "action", newSVpv(action, 0)); hv_stores(hv, "action", newSVpv(action, 0));
s = PQresultErrorField(r, PG_DIAG_SEVERITY_NONLOCALIZED); char *s = PQresultErrorField(r, PG_DIAG_SEVERITY_NONLOCALIZED);
hv_stores(hv, "severity", newSVpv(s ? s : "FATAL", 0)); hv_stores(hv, "severity", newSVpv(s ? s : "FATAL", 0));
if (query) hv_stores(hv, "query", newSVpv(s, 0)); if (query) hv_stores(hv, "query", newSVpv(query, 0));
/* If the PGresult is not an error, assume it's an unexpected resultStatus */ /* If the PGresult is not an error, assume it's an unexpected resultStatus */
s = PQresultErrorField(r, PG_DIAG_MESSAGE_PRIMARY); s = PQresultErrorField(r, PG_DIAG_MESSAGE_PRIMARY);
@ -36,11 +35,12 @@ static void fupg_result_croak(PGresult *r, const char *action, const char *query
/* I like the verbose error messages. Doesn't include anything that's not /* I like the verbose error messages. Doesn't include anything that's not
* also fetched below, but saves me from having to do the formatting * also fetched below, but saves me from having to do the formatting
* manually. */ * manually. */
char *verbose = NULL;
if (s) { if (s) {
s = PQresultVerboseErrorMessage(r, 2 /* PQERRORS_VERBOSE */, 1 /* PQSHOW_CONTEXT_ERRORS */); verbose = PQresultVerboseErrorMessage(r, PQERRORS_VERBOSE, PQSHOW_CONTEXT_ERRORS);
if (s) { if (s) {
hv_stores(hv, "verbose_message", newSVpv(s, 0)); hv_stores(hv, "verbose_message", newSVpv(verbose, 0));
PQfreemem(s); PQfreemem(verbose);
} }
} }
@ -60,7 +60,13 @@ static void fupg_result_croak(PGresult *r, const char *action, const char *query
if ((s = PQresultErrorField(r, PG_DIAG_SOURCE_FUNCTION))) hv_stores(hv, "source_function", newSVpv(s, 0)); if ((s = PQresultErrorField(r, PG_DIAG_SOURCE_FUNCTION))) hv_stores(hv, "source_function", newSVpv(s, 0));
PQclear(r); PQclear(r);
croak_sv(sv_bless(sv_2mortal(newRV_noinc((SV *)hv)), gv_stashpvs("FU::PG::error", GV_ADD))); croak_sv(verbose
? fu_croak_hv(hv, "FU::PG::error", "%s", SvPV_nolen(*hv_fetchs(hv, "verbose_message", 0)))
: fu_croak_hv(hv, "FU::PG::error", "%s: %s",
SvPV_nolen(*hv_fetchs(hv, "severity", 0)),
SvPV_nolen(*hv_fetchs(hv, "message", 0))
)
);
} }
static SV *fupg_connect(pTHX_ const char *str) { static SV *fupg_connect(pTHX_ const char *str) {
@ -81,6 +87,17 @@ static void fupg_destroy(fupg_conn *c) {
safefree(c); safefree(c);
} }
static SV *fupg_exec_result(pTHX_ PGresult *r) {
SV *ret = &PL_sv_undef;
char *tup = PQcmdTuples(r);
if (tup && *tup) {
ret = sv_2mortal(newSVpv(tup, 0));
SvIV(ret);
SvIOK_only(ret);
}
return ret;
}
static SV *fupg_exec(pTHX_ fupg_conn *c, const char *sql) { static SV *fupg_exec(pTHX_ fupg_conn *c, const char *sql) {
PGresult *r = PQexec(c->conn, sql); PGresult *r = PQexec(c->conn, sql);
if (!r) fupg_conn_croak(c, "exec"); if (!r) fupg_conn_croak(c, "exec");
@ -90,13 +107,7 @@ static SV *fupg_exec(pTHX_ fupg_conn *c, const char *sql) {
case PGRES_TUPLES_OK: break; case PGRES_TUPLES_OK: break;
default: fupg_result_croak(r, "exec", sql); default: fupg_result_croak(r, "exec", sql);
} }
SV *ret = &PL_sv_undef; SV *ret = fupg_exec_result(r);
char *tup = PQcmdTuples(r);
if (tup && *tup) {
ret = sv_2mortal(newSVpv(tup, 0));
SvIV(ret);
SvIOK_only(ret);
}
PQclear(r); PQclear(r);
return ret; return ret;
} }
@ -112,6 +123,10 @@ typedef struct {
char name[32]; char name[32];
int prepared; int prepared;
PGresult *describe; PGresult *describe;
/* Set during execute */
int paramn;
char **param;
PGresult *result;
} fupg_st; } fupg_st;
static SV *fupg_q(pTHX_ fupg_conn *c, const char *query, I32 ax, I32 argc) { static SV *fupg_q(pTHX_ fupg_conn *c, const char *query, I32 ax, I32 argc) {
@ -122,8 +137,8 @@ static SV *fupg_q(pTHX_ fupg_conn *c, const char *query, I32 ax, I32 argc) {
st->query = savepv(query); st->query = savepv(query);
if (argc > 2) { if (argc > 2) {
st->bind = safemalloc((argc-2) * sizeof(SV *)); st->bind = safemalloc((argc-2) * sizeof(SV *));
I32 i = 2; I32 i;
while (i < argc) { for (i=2; i < argc; i++) {
st->bind[st->bindn] = newSV(0); st->bind[st->bindn] = newSV(0);
sv_setsv(st->bind[st->bindn], ST(i)); sv_setsv(st->bind[st->bindn], ST(i));
st->bindn++; st->bindn++;
@ -157,7 +172,7 @@ static void fupg_st_prepare(pTHX_ fupg_st *st) {
} }
static SV *fupg_st_params(pTHX_ fupg_st *st) { static SV *fupg_st_params(pTHX_ fupg_st *st) {
fupg_st_prepare(st); fupg_st_prepare(aTHX_ st);
int i, nparams = PQnparams(st->describe); int i, nparams = PQnparams(st->describe);
AV *av = newAV_alloc_x(nparams); AV *av = newAV_alloc_x(nparams);
for (i=0; i<nparams; i++) { for (i=0; i<nparams; i++) {
@ -169,7 +184,7 @@ static SV *fupg_st_params(pTHX_ fupg_st *st) {
} }
static SV *fupg_st_columns(pTHX_ fupg_st *st) { static SV *fupg_st_columns(pTHX_ fupg_st *st) {
fupg_st_prepare(st); fupg_st_prepare(aTHX_ st);
int i, ncols = PQnfields(st->describe); int i, ncols = PQnfields(st->describe);
AV *av = newAV_alloc_x(ncols); AV *av = newAV_alloc_x(ncols);
for (i=0; i<ncols; i++) { for (i=0; i<ncols; i++) {
@ -184,6 +199,47 @@ static SV *fupg_st_columns(pTHX_ fupg_st *st) {
return sv_2mortal(newRV_noinc((SV *)av)); return sv_2mortal(newRV_noinc((SV *)av));
} }
static void fupg_st_execute(pTHX_ fupg_st *st) {
/* Disallow fetching the results more than once. I don't see a reason why
* someone would need that and disallowing it leaves room for fetching the
* results in a streaming fashion without breaking API compat. */
if (st->result) croak("Invalid attempt to execute statement multiple times");
/* TODO: prepare can be skipped when prepared statement caching is disabled and (text-format queries or no bind params) */
fupg_st_prepare(aTHX_ st);
st->param = safemalloc(st->bindn * sizeof(char *));
int i;
for (i=0; i<st->bindn; i++) {
st->param[i] = SvPVutf8_nolen(st->bind[i]);
st->paramn++;
}
/* I'm not super fond of this approach. Storing the full query results in a
* PGresult involves unnecessary parsing, memory allocation and copying.
* The wire protocol is sufficiently simple that I could parse the query
* results directly from the network buffers without much additional code,
* and that would be much more efficient. Alas, libpq doesn't let me do
* that.
* There is the option of fetching results in chunked mode, but from what I
* gather that just saves a bit of memory in exchange for more and smaller
* malloc()/free()'s. Performance-wise, it probably won't be much of an
* improvement */
PGresult *r = PQexecPrepared(st->conn->conn, st->name, st->paramn, (const char * const*)st->param, NULL, NULL, 0);
if (!r) fupg_conn_croak(st->conn , "exec");
switch (PQresultStatus(r)) {
case PGRES_EMPTY_QUERY:
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK: break;
default: fupg_result_croak(r, "exec", st->query);
}
st->result = r;
}
static SV *fupg_st_exec(pTHX_ fupg_st *st) {
fupg_st_execute(aTHX_ st);
return fupg_exec_result(st->result);
}
static void fupg_st_destroy(fupg_st *st) { static void fupg_st_destroy(fupg_st *st) {
int i; int i;
/* Ignore failure, this is just a best-effort attempt to free up resources on the backend */ /* Ignore failure, this is just a best-effort attempt to free up resources on the backend */
@ -192,7 +248,10 @@ static void fupg_st_destroy(fupg_st *st) {
if (st->query) safefree(st->query); if (st->query) safefree(st->query);
for (i=0; i < st->bindn; i++) SvREFCNT_dec(st->bind[i]); for (i=0; i < st->bindn; i++) SvREFCNT_dec(st->bind[i]);
if (st->bind) safefree(st->bind); if (st->bind) safefree(st->bind);
if (st->describe) PQclear(st->describe); /* XXX: These point into bind SVs (for now): for (i=0; i < st->paramn; i++) safefree(st->param[i]); */
if (st->param) safefree(st->param);
PQclear(st->describe);
PQclear(st->result);
SvREFCNT_dec(st->conn->self); SvREFCNT_dec(st->conn->self);
safefree(st); safefree(st);
} }

View file

@ -40,10 +40,17 @@ okerr ERROR => prepare => qr/syntax error/;
is_deeply $st->params, []; is_deeply $st->params, [];
is_deeply $st->columns, [{ name => '?column?', oid => 23 }]; is_deeply $st->columns, [{ name => '?column?', oid => 23 }];
is $conn->exec('SELECT 1 FROM pg_prepared_statements'), 1; is $conn->exec('SELECT 1 FROM pg_prepared_statements'), 1;
is $st->exec, 1;
} }
is $conn->exec('SELECT 1 FROM pg_prepared_statements'), 0; is $conn->exec('SELECT 1 FROM pg_prepared_statements'), 0;
ok !eval { $conn->q('SELECT 1', 1)->exec; 1 };
okerr ERROR => exec => qr/bind message supplies 1 parameters, but prepared statement/;
ok !eval { $conn->q('SELECT $1')->exec; 1 };
okerr ERROR => exec => qr/bind message supplies 0 parameters, but prepared statement/;
{ {
my $st = $conn->q("SELECT \$1::int AS a, \$2::char(5) AS \"\x{1F603}\""); my $st = $conn->q("SELECT \$1::int AS a, \$2::char(5) AS \"\x{1F603}\"");
undef $conn; # statement keeps the connection alive undef $conn; # statement keeps the connection alive