From 187417f160adb750510e1d6a4353cea59cbd8df4 Mon Sep 17 00:00:00 2001 From: Yorhel Date: Wed, 5 Feb 2025 11:49:22 +0100 Subject: [PATCH] pg: Statement preparing + inspection; less wonky object handling? --- FU.xs | 38 ++++++++++++++++-- c/common.c | 11 ++++++ c/libpq.h | 16 +++++++- c/pgconn.c | 104 ++++++++++++++++++++++++++++++++++++++++++++++++-- t/pgconnect.t | 27 ++++++++++++- 5 files changed, 186 insertions(+), 10 deletions(-) diff --git a/FU.xs b/FU.xs index 674d65d..a3b796d 100644 --- a/FU.xs +++ b/FU.xs @@ -1,5 +1,7 @@ +#include #include +#undef PERL_IMPLICIT_SYS #define PERL_NO_GET_CONTEXT #include "EXTERN.h" #include "perl.h" @@ -20,11 +22,16 @@ PROTOTYPES: DISABLE TYPEMAP: <conn, stderr); + else PQuntrace(c->conn); + ST(0) = c->self; + void exec(fupg_conn *c, SV *sv) CODE: - ST(0) = fupg_exec(c, SvPVutf8_nolen(sv)); + ST(0) = fupg_exec(aTHX_ c, SvPVutf8_nolen(sv)); + +void q(fupg_conn *c, SV *sv, ...) + CODE: + ST(0) = fupg_q(aTHX_ c, SvPVutf8_nolen(sv), ax, items); void DESTROY(fupg_conn *c) CODE: fupg_destroy(c); + + +MODULE = FU PACKAGE = FU::PG::st + +void params(fupg_st *st) + CODE: + ST(0) = fupg_st_params(aTHX_ st); + +void columns(fupg_st *st) + CODE: + ST(0) = fupg_st_columns(aTHX_ st); + +void DESTROY(fupg_st *st) + CODE: + fupg_st_destroy(st); diff --git a/c/common.c b/c/common.c index 6139e04..1dfc45e 100644 --- a/c/common.c +++ b/c/common.c @@ -1,3 +1,14 @@ +/* Because I don't know how to use sv_setref_pv() correctly. */ + +static SV *fupg_selfobj_(pTHX_ SV **self, void *obj, const char *klass) { + *self = newSViv(PTR2IV(obj)); + return sv_bless(sv_2mortal(newRV_noinc(*self)), gv_stashpv(klass, GV_ADD)); +} +/* Write a blessed SV to obj->self and returns a mortal ref to it */ +#define fupg_selfobj(obj, klass) fupg_selfobj_(aTHX_ &((obj)->self), obj, klass) + + + /* Custom string builder, should be slightly faster than using Sv* macros directly. */ typedef struct { diff --git a/c/libpq.h b/c/libpq.h index 8986c79..e9532de 100644 --- a/c/libpq.h +++ b/c/libpq.h @@ -4,6 +4,7 @@ typedef struct PGconn PGconn; typedef struct PGresult PGresult; +typedef unsigned int Oid; typedef enum { PGRES_EMPTY_QUERY = 0, PGRES_COMMAND_OK, PGRES_TUPLES_OK, PGRES_COPY_OUT, PGRES_COPY_IN, @@ -34,20 +35,31 @@ typedef enum { PQSHOW_CONTEXT_NEVER, PQSHOW_CONTEXT_ERRORS, PQSHOW_CONTEXT_ALWAY #define PG_FUNCS \ X(PQclear, void, PGresult *) \ - X(PQconnectdb, PGconn *, const char *) \ + X(PQclosePrepared, PGresult *, PGconn *, const char *) \ X(PQcmdTuples, char *, PGresult *) \ + X(PQconnectdb, PGconn *, const char *) \ + X(PQdescribePrepared, PGresult *, PGconn *, const char *) \ X(PQerrorMessage, char *, const PGconn *) \ X(PQexec, PGresult *, PGconn *, const char *) \ X(PQfinish, void, PGconn *) \ + X(PQfmod, int, const PGresult *, int) \ + X(PQfname, char *, const PGresult *, int) \ X(PQfreemem, void, void *) \ + X(PQftype, Oid, const PGresult *, int) \ X(PQlibVersion, int, void) \ + X(PQnfields, int, const PGresult *) \ + X(PQnparams, int, const PGresult *) \ + X(PQparamtype, Oid, const PGresult *, int) \ + X(PQprepare, PGresult *, PGconn *, const char *, const char *, int, const Oid *) \ X(PQresStatus, char *, ExecStatusType) \ X(PQresultErrorField, char *, const PGresult *, int) \ X(PQresultErrorMessage, char *, const PGresult *res) \ X(PQresultStatus, ExecStatusType, const PGresult *) \ X(PQresultVerboseErrorMessage, char *, const PGresult *, PGVerbosity, PGContextVisibility) \ X(PQserverVersion, int, const PGconn *) \ - X(PQstatus, int, const PGconn *) + X(PQstatus, int, const PGconn *) \ + X(PQtrace, void, PGconn *, FILE *) \ + X(PQuntrace, void, PGconn *) #define X(n, r, ...) static r (*n)(__VA_ARGS__); PG_FUNCS diff --git a/c/pgconn.c b/c/pgconn.c index e507a54..187eeeb 100644 --- a/c/pgconn.c +++ b/c/pgconn.c @@ -1,5 +1,7 @@ typedef struct { + SV *self; PGconn *conn; + UV prep_counter; } fupg_conn; @@ -61,7 +63,7 @@ static void fupg_result_croak(PGresult *r, const char *action, const char *query croak_sv(sv_bless(sv_2mortal(newRV_noinc((SV *)hv)), gv_stashpvs("FU::PG::error", GV_ADD))); } -static fupg_conn *fupg_connect(pTHX_ const char *str) { +static SV *fupg_connect(pTHX_ const char *str) { PGconn *conn = PQconnectdb(str); if (PQstatus(conn) != 0) { SV *sv = fupg_conn_errsv(conn, "connect"); @@ -71,7 +73,7 @@ static fupg_conn *fupg_connect(pTHX_ const char *str) { fupg_conn *c = safecalloc(1, sizeof(fupg_conn)); c->conn = conn; - return c; + return fupg_selfobj(c, "FU::PG::conn"); } static void fupg_destroy(fupg_conn *c) { @@ -79,7 +81,7 @@ static void fupg_destroy(fupg_conn *c) { safefree(c); } -static SV *fupg_exec(fupg_conn *c, const char *sql) { +static SV *fupg_exec(pTHX_ fupg_conn *c, const char *sql) { PGresult *r = PQexec(c->conn, sql); if (!r) fupg_conn_croak(c, "exec"); switch (PQresultStatus(r)) { @@ -98,3 +100,99 @@ static SV *fupg_exec(fupg_conn *c, const char *sql) { PQclear(r); return ret; } + +typedef struct { + /* Set in $conn->q() */ + SV *self; + fupg_conn *conn; /* has a refcnt on conn->self */ + char *query; + SV **bind; + int bindn; + /* Set during prepare */ + char name[32]; + int prepared; + PGresult *describe; +} fupg_st; + +static SV *fupg_q(pTHX_ fupg_conn *c, const char *query, I32 ax, I32 argc) { + fupg_st *st = safecalloc(1, sizeof(fupg_st)); + st->conn = c; + SvREFCNT_inc(c->self); + + st->query = savepv(query); + if (argc > 2) { + st->bind = safemalloc((argc-2) * sizeof(SV *)); + I32 i = 2; + while (i < argc) { + st->bind[st->bindn] = newSV(0); + sv_setsv(st->bind[st->bindn], ST(i)); + st->bindn++; + } + } + + return fupg_selfobj(st, "FU::PG::st"); +} + +static void fupg_st_prepare(pTHX_ fupg_st *st) { + if (st->describe) return; + if (st->prepared) croak("invalid attempt to re-prepare invalid statement"); + + /* TODO: This is where we check for any cached prepared statements */ + + snprintf(st->name, sizeof(st->name), "fupg%"UVuf, ++st->conn->prep_counter); + + /* TODO: Pipeline these two commands, no need for two round-trips with the server */ + PGresult *r = PQprepare(st->conn->conn, st->name, st->query, 0, NULL); + if (!r) fupg_conn_croak(st->conn , "prepare"); + if (PQresultStatus(r) != PGRES_COMMAND_OK) + fupg_result_croak(r, "prepare", st->query); + PQclear(r); + st->prepared = 1; + + r = PQdescribePrepared(st->conn->conn, st->name); + if (!r) fupg_conn_croak(st->conn , "prepare"); + if (PQresultStatus(r) != PGRES_COMMAND_OK) + fupg_result_croak(r, "prepare", st->query); + st->describe = r; +} + +static SV *fupg_st_params(pTHX_ fupg_st *st) { + fupg_st_prepare(st); + int i, nparams = PQnparams(st->describe); + AV *av = newAV_alloc_x(nparams); + for (i=0; idescribe, i))); + av_push_simple(av, newRV_noinc((SV *)hv)); + } + return sv_2mortal(newRV_noinc((SV *)av)); +} + +static SV *fupg_st_columns(pTHX_ fupg_st *st) { + fupg_st_prepare(st); + int i, ncols = PQnfields(st->describe); + AV *av = newAV_alloc_x(ncols); + for (i=0; idescribe, i); + hv_stores(hv, "name", newSVpvn_utf8(name, strlen(name), 1)); + hv_stores(hv, "oid", newSViv(PQftype(st->describe, i))); + int tmod = PQfmod(st->describe, i); + if (tmod >= 0) hv_stores(hv, "typemod", newSViv(tmod)); + av_push_simple(av, newRV_noinc((SV *)hv)); + } + return sv_2mortal(newRV_noinc((SV *)av)); +} + +static void fupg_st_destroy(fupg_st *st) { + int i; + /* Ignore failure, this is just a best-effort attempt to free up resources on the backend */ + if (st->prepared) PQclear(PQclosePrepared(st->conn->conn, st->name)); + + if (st->query) safefree(st->query); + for (i=0; i < st->bindn; i++) SvREFCNT_dec(st->bind[i]); + if (st->bind) safefree(st->bind); + if (st->describe) PQclear(st->describe); + SvREFCNT_dec(st->conn->self); + safefree(st); +} diff --git a/t/pgconnect.t b/t/pgconnect.t index 023f0f2..b714401 100644 --- a/t/pgconnect.t +++ b/t/pgconnect.t @@ -17,10 +17,11 @@ okerr FATAL => connect => qr/missing "=" after "invalid"/; ok FU::PG::lib_version() > 100000; my $conn = FU::PG->connect($ENV{FU_TEST_DB}); +$conn->_debug_trace(0); is ref $conn, 'FU::PG::conn'; -ok $conn->server_version() > 100000; -is $conn->lib_version(), FU::PG::lib_version(); +ok $conn->server_version > 100000; +is $conn->lib_version, FU::PG::lib_version(); ok !eval { $conn->exec('COPY (SELECT 1) TO STDOUT'); }; okerr FATAL => exec => qr/unexpected status code/; @@ -31,4 +32,26 @@ okerr ERROR => exec => qr/syntax error/; ok !defined $conn->exec(''); is $conn->exec('SELECT 1'), 1; +ok !eval { $conn->q('SELEXT')->params; }; +okerr ERROR => prepare => qr/syntax error/; + +{ + my $st = $conn->q('SELECT 1'); + is_deeply $st->params, []; + is_deeply $st->columns, [{ name => '?column?', oid => 23 }]; + is $conn->exec('SELECT 1 FROM pg_prepared_statements'), 1; +} + +is $conn->exec('SELECT 1 FROM pg_prepared_statements'), 0; + +{ + my $st = $conn->q("SELECT \$1::int AS a, \$2::char(5) AS \"\x{1F603}\""); + undef $conn; # statement keeps the connection alive + is_deeply $st->params, [ { oid => 23 }, { oid => 1042 } ]; + is_deeply $st->columns, [ + { oid => 23, name => 'a' }, + { oid => 1042, name => "\x{1F603}", typemod => 9 }, + ]; +} + done_testing;