From 171afc02689e44ab2ee66d14815574ce9b9b8589 Mon Sep 17 00:00:00 2001 From: Yorhel Date: Thu, 6 Feb 2025 17:38:33 +0100 Subject: [PATCH] pg: Add transaction & subtransaction support Was expecting the implementation of this to get overly complicated and brittle, but using a counter-based cookie and doing parts of it in Perl made it pretty easy actually. Pretty happy with how this turned out so far. TODO: documentation -.- --- FU.xs | 47 ++++++++++++++++ FU/PG.pm | 50 +++++++++++++++++ c/libpq.h | 5 +- c/pgconn.c | 38 +++++++++++-- t/pgconnect.t | 146 +++++++++++++++++++++++++++++++++++++++++++++++--- 5 files changed, 276 insertions(+), 10 deletions(-) diff --git a/FU.xs b/FU.xs index 4883bc0..8799efb 100644 --- a/FU.xs +++ b/FU.xs @@ -14,6 +14,15 @@ #include "c/pgconn.c" +#define FUPG_CONN_COOKIE \ + if (c->cookie) croak("Invalid attempt to run a query on the top-level connection while a transaction object exists") + +#define FUPG_ST_COOKIE \ + if (st->cookie != st->conn->cookie) croak("Invalid cross-transaction operation on statement object") + +typedef fupg_conn *fupg_txn; + + MODULE = FU PROTOTYPES: DISABLE @@ -22,6 +31,7 @@ PROTOTYPES: DISABLE TYPEMAP: <conn); ST(0) = c->self; +void _set_cookie(fupg_conn *c, UV cookie) + CODE: + c->cookie = cookie; + +UV _get_cookie(fupg_conn *c) + CODE: + RETVAL = c->cookie; + OUTPUT: + RETVAL + +void status(fupg_conn *c) + CODE: + ST(0) = sv_2mortal(newSVpv(fupg_status(c), 0)); + void exec(fupg_conn *c, SV *sv) CODE: + FUPG_CONN_COOKIE; ST(0) = fupg_exec(aTHX_ c, SvPVutf8_nolen(sv)); void q(fupg_conn *c, SV *sv, ...) CODE: + FUPG_CONN_COOKIE; ST(0) = fupg_q(aTHX_ c, SvPVutf8_nolen(sv), ax, items); void DESTROY(fupg_conn *c) @@ -93,34 +122,52 @@ void DESTROY(fupg_conn *c) fupg_destroy(c); +MODULE = FU PACKAGE = FU::PG::txn + +void exec(fupg_txn c, SV *sv) + CODE: + ST(0) = fupg_exec(aTHX_ c, SvPVutf8_nolen(sv)); + +void q(fupg_txn c, SV *sv, ...) + CODE: + ST(0) = fupg_q(aTHX_ c, SvPVutf8_nolen(sv), ax, items); + + MODULE = FU PACKAGE = FU::PG::st void params(fupg_st *st) CODE: + FUPG_ST_COOKIE; ST(0) = fupg_st_params(aTHX_ st); void columns(fupg_st *st) CODE: + FUPG_ST_COOKIE; ST(0) = fupg_st_columns(aTHX_ st); void exec(fupg_st *st) CODE: + FUPG_ST_COOKIE; ST(0) = fupg_st_exec(aTHX_ st); void val(fupg_st *st) CODE: + FUPG_ST_COOKIE; ST(0) = fupg_st_val(aTHX_ st); void rowl(fupg_st *st) CODE: + FUPG_ST_COOKIE; XSRETURN(fupg_st_rowl(aTHX_ st, ax)); void rowa(fupg_st *st) CODE: + FUPG_ST_COOKIE; ST(0) = fupg_st_rowa(aTHX_ st); void rowh(fupg_st *st) CODE: + FUPG_ST_COOKIE; ST(0) = fupg_st_rowh(aTHX_ st); void DESTROY(fupg_st *st) diff --git a/FU/PG.pm b/FU/PG.pm index 5d4843e..0dd398b 100644 --- a/FU/PG.pm +++ b/FU/PG.pm @@ -6,8 +6,58 @@ _load_libpq(); package FU::PG::conn { sub lib_version { FU::PG::lib_version() } + + sub txn($c) { + $c->exec('BEGIN'); + $c->_set_cookie(++$FU::PG::txn::COUNTER); + bless [$c, $FU::PG::txn::COUNTER, undef], 'FU::PG::txn'; + } }; +package FU::PG::txn { + use Carp 'croak'; + + my $COUNTER = 0; + + # Arrayref: + # 0: $conn + # 1: $cookie, a snapshot of $COUNTER that identifies this transaction, used + # to match commands against transactions. Set to undef when this + # transaction is 'done' but the object is still alive. + # 2: $parent, undef if this is a top-level transaction. + + sub commit($t) { + croak "Unable to commit transaction that has already finished" if !$t->[1]; + $t->exec($t->[2] ? "RELEASE SAVEPOINT fupg_$t->[1]" : 'COMMIT'); + $t->[1] = undef; + } + + sub rollback($t) { + croak "Unable to rollback transaction that has already finished" if !$t->[1]; + $t->exec($t->[2] ? "ROLLBACK TO SAVEPOINT fupg_$t->[1]" : 'ROLLBACK'); + $t->[1] = undef; + } + + sub txn($t) { + croak "Unable to create sub-transaction when current transaction has already finished" if !$t->[1]; + $COUNTER++; + $t->exec("SAVEPOINT fupg_$COUNTER"); + $t->[0]->_set_cookie($COUNTER); + bless [$t->[0], $COUNTER, $t], 'FU::PG::txn'; + } + + sub status($t) { + my $cs = $t->[0]->status; + return $cs if $cs eq 'bad' || ($t->[1] && $t->[0]->_get_cookie != $t->[1]); + return $cs eq 'txn_error' ? 'error' : $t->[1] ? 'idle' : 'done'; + } + + sub DESTROY($t) { + $t->rollback if $t->[1]; + $t->[0]->_set_cookie($t->[2] ? $t->[2][1] : 0); + } +} + package FU::PG::error { use overload '""' => sub($e, @) { $e->{full_message} }; } diff --git a/c/libpq.h b/c/libpq.h index ed71002..c62b3f5 100644 --- a/c/libpq.h +++ b/c/libpq.h @@ -13,6 +13,8 @@ typedef enum { } ExecStatusType; typedef enum { PQERRORS_TERSE, PQERRORS_DEFAULT, PQERRORS_VERBOSE, PQERRORS_SQLSTATE } PGVerbosity; typedef enum { PQSHOW_CONTEXT_NEVER, PQSHOW_CONTEXT_ERRORS, PQSHOW_CONTEXT_ALWAYS } PGContextVisibility; +typedef enum { CONNECTION_OK, CONNECTION_BAD } ConnStatusType; /* There's more, but they're irrelevant to us */ +typedef enum { PQTRANS_IDLE, PQTRANS_ACTIVE, PQTRANS_INTRANS, PQTRANS_INERROR, PQTRANS_UNKNOWN } PGTransactionStatusType; #define PG_DIAG_SEVERITY 'S' #define PG_DIAG_SEVERITY_NONLOCALIZED 'V' @@ -62,8 +64,9 @@ typedef enum { PQSHOW_CONTEXT_NEVER, PQSHOW_CONTEXT_ERRORS, PQSHOW_CONTEXT_ALWAY X(PQresultStatus, ExecStatusType, const PGresult *) \ X(PQresultVerboseErrorMessage, char *, const PGresult *, PGVerbosity, PGContextVisibility) \ X(PQserverVersion, int, const PGconn *) \ - X(PQstatus, int, const PGconn *) \ + X(PQstatus, ConnStatusType, const PGconn *) \ X(PQtrace, void, PGconn *, FILE *) \ + X(PQtransactionStatus, PGTransactionStatusType, const PGconn *) \ X(PQuntrace, void, PGconn *) #define X(n, r, ...) static r (*n)(__VA_ARGS__); diff --git a/c/pgconn.c b/c/pgconn.c index 89e0a6c..b24a913 100644 --- a/c/pgconn.c +++ b/c/pgconn.c @@ -2,6 +2,7 @@ typedef struct { SV *self; PGconn *conn; UV prep_counter; + UV cookie; /* currently active transaction object; 0 = none active */ } fupg_conn; @@ -71,7 +72,7 @@ static void fupg_result_croak(PGresult *r, const char *action, const char *query static SV *fupg_connect(pTHX_ const char *str) { PGconn *conn = PQconnectdb(str); - if (PQstatus(conn) != 0) { + if (PQstatus(conn) != CONNECTION_OK) { SV *sv = fupg_conn_errsv(conn, "connect"); PQfinish(conn); croak_sv(sv); @@ -82,6 +83,17 @@ static SV *fupg_connect(pTHX_ const char *str) { return fupg_selfobj(c, "FU::PG::conn"); } +static const char *fupg_status(fupg_conn *c) { + if (PQstatus(c->conn) == CONNECTION_BAD) return "bad"; + switch (PQtransactionStatus(c->conn)) { + case PQTRANS_IDLE: return c->cookie ? "txn_done" : "idle"; + case PQTRANS_ACTIVE: return "active"; /* can't happen, we don't do async */ + case PQTRANS_INTRANS: return "txn_idle"; + case PQTRANS_INERROR: return "txn_error"; + default: return "unknown"; + } +} + static void fupg_destroy(fupg_conn *c) { PQfinish(c->conn); safefree(c); @@ -112,6 +124,26 @@ static SV *fupg_exec(pTHX_ fupg_conn *c, const char *sql) { return ret; } +/* Validate a FU::PG::txn object and extract the connection */ +static fupg_conn *fupg_get_transaction(pTHX_ SV *sv) { + if (!sv_derived_from(sv, "FU::PG::txn")) goto invalid; + sv = SvRV(sv); + if (SvTYPE(sv) != SVt_PVAV) goto invalid; + AV *av = (AV *)sv; + + SV **v = av_fetch(av, 0, 0); + if (!v || !*v) goto invalid; + fupg_conn *c = (fupg_conn *)SvIVX(SvRV(*v)); + + v = av_fetch(av, 1, 0); + if (!v || !*v) goto invalid; + if (!SvOK(*v)) croak("Invalid attempt to run a query on a transaction that has already finished"); + if (c->cookie != SvUV(*v)) croak("Invalid cross-transaction operation"); + return c; +invalid: + croak("invalid transaction object"); +} + /* Read a Perl value from a PGresult. * Currently assumes text format and just creates a PV. */ static SV *fupg_val(pTHX_ const PGresult *r, int row, int col) { @@ -123,6 +155,7 @@ typedef struct { /* Set in $conn->q() */ SV *self; fupg_conn *conn; /* has a refcnt on conn->self */ + UV cookie; char *query; SV **bind; int bindn; @@ -139,6 +172,7 @@ typedef struct { static SV *fupg_q(pTHX_ fupg_conn *c, const char *query, I32 ax, I32 argc) { fupg_st *st = safecalloc(1, sizeof(fupg_st)); st->conn = c; + st->cookie = c->cookie; SvREFCNT_inc(c->self); st->query = savepv(query); @@ -249,7 +283,6 @@ static void fupg_st_execute(pTHX_ fupg_st *st) { PGresult *r = PQexecPrepared(st->conn->conn, st->name, st->paramn, (const char * const*)st->param, NULL, NULL, 0); if (!r) fupg_conn_croak(st->conn , "exec"); switch (PQresultStatus(r)) { - case PGRES_EMPTY_QUERY: case PGRES_COMMAND_OK: case PGRES_TUPLES_OK: break; default: fupg_result_croak(r, "exec", st->query); @@ -334,5 +367,4 @@ static void fupg_st_destroy(fupg_st *st) { /* TODO: $st->alla, allh, flat, kvv, kva, kvh */ /* TODO: Prepared statement caching */ -/* TODO: Transactions */ /* TODO: Binary format fetching & type handling */ diff --git a/t/pgconnect.t b/t/pgconnect.t index 033e5a9..588bef7 100644 --- a/t/pgconnect.t +++ b/t/pgconnect.t @@ -2,6 +2,7 @@ use v5.36; use Test::More; plan skip_all => $@ if !eval { require FU::PG; } && $@ =~ /Unable to load libpq/; +die $@ if $@; plan skip_all => 'Please set FU_TEST_DB to a PostgreSQL connection string to run these tests' if !$ENV{FU_TEST_DB}; sub okerr($sev, $act, $msg) { @@ -22,6 +23,7 @@ $conn->_debug_trace(0); is ref $conn, 'FU::PG::conn'; ok $conn->server_version > 100000; is $conn->lib_version, FU::PG::lib_version(); +is $conn->status, 'idle'; subtest '$conn->exec', sub { ok !eval { $conn->exec('COPY (SELECT 1) TO STDOUT'); }; @@ -35,6 +37,8 @@ subtest '$conn->exec', sub { ok !eval { $conn->q('SELEXT')->params; }; okerr ERROR => prepare => qr/syntax error/; + + is $conn->exec('SET client_encoding=utf8'), undef; }; @@ -45,6 +49,18 @@ subtest '$st prepare & exec', sub { is_deeply $st->columns, [{ name => '?column?', oid => 23 }]; is $conn->exec('SELECT 1 FROM pg_prepared_statements'), 1; is $st->exec, 1; + ok !eval { $st->exec; 1 }; + like $@, qr/Invalid attempt to execute statement multiple times/; + } + + { + my $st = $conn->q("SELECT \$1::int AS a, \$2::char(5) AS \"\x{1F603}\"", 1, 2); + is_deeply $st->params, [ { oid => 23 }, { oid => 1042 } ]; + is_deeply $st->columns, [ + { oid => 23, name => 'a' }, + { oid => 1042, name => "\x{1F603}", typemod => 9 }, + ]; + is $st->exec, 1; } is $conn->exec('SELECT 1 FROM pg_prepared_statements'), 0; @@ -54,6 +70,15 @@ subtest '$st prepare & exec', sub { ok !eval { $conn->q('SELECT $1')->exec; 1 }; okerr ERROR => exec => qr/bind message supplies 0 parameters, but prepared statement/; + + # prepare + describe won't let us detect empty queries, hmm... + is_deeply $conn->q('')->params, []; + is_deeply $conn->q('')->columns, []; + + ok !eval { $conn->q('')->exec; 1 }; + okerr FATAL => exec => qr/unexpected status code/; + + is $conn->q('SET client_encoding=utf8')->exec, undef; }; subtest '$st->val', sub { @@ -112,14 +137,123 @@ subtest '$st->rowh', sub { is_deeply $conn->q('SELECT 1 as a, 2 as b')->rowh, {a => 1, b => 2}; }; +subtest 'txn', sub { + $conn->exec('CREATE TEMPORARY TABLE fupg_tst (id int)'); + $conn->txn->exec('INSERT INTO fupg_tst VALUES (1)'); # rolled back + is $conn->q('SELECT COUNT(*) FROM fupg_tst')->val, 0; + + my $st = $conn->q('SELECT COUNT(*) FROM fupg_tst'); + my $sst; + { + my $txn = $conn->txn; + is $conn->status, 'txn_idle'; + is $txn->status, 'idle'; + + ok !eval { $st->exec; 1 }; + like $@, qr/Invalid cross-transaction/; + + ok !eval { $conn->exec('SELECT 1'); 1 }; + like $@, qr/Invalid attempt to run a query/; + ok !eval { $conn->q('SELECT 1'); 1 }; + like $@, qr/Invalid attempt to run a query/; + ok !eval { $conn->txn; 1 }; + like $@, qr/Invalid attempt to run a query/; + + $txn->exec('INSERT INTO fupg_tst VALUES (1)'); + $sst = $txn->q('SELECT 1'); + + is $conn->status, 'txn_idle'; + is $txn->status, 'idle'; + $txn->commit; + is $conn->status, 'txn_done'; + is $txn->status, 'done'; + + ok !eval { $txn->rollback; 1 }; + like $@, qr/Unable to rollback/; + ok !eval { $txn->commit; 1 }; + like $@, qr/Unable to commit/; + ok !eval { $txn->txn; 1 }; + like $@, qr/Unable to create/; + ok !eval { $txn->exec('select 1'); 1 }; + like $@, qr/Invalid attempt to run a query/; + ok !eval { $txn->q('select 1'); 1 }; + like $@, qr/Invalid attempt to run a query/; + + ok !eval { $conn->exec('SELECT 1'); 1 }; + like $@, qr/Invalid attempt to run a query/; + } + is $conn->status, 'idle'; + is $st->val, 1; + ok !eval { $sst->exec; 1 }; + like $@, qr/Invalid cross-transaction/; + + { + my $txn = $conn->txn; + ok !eval { $txn->exec('SELEXT'); 1 }; # puts txn in error state + is $conn->status, 'txn_error'; + is $txn->status, 'error'; + ok !eval { $txn->exec('SELECT 1'); 1 }; + like $@, qr/current transaction is aborted/; + + $txn->rollback; + is $conn->status, 'txn_done'; + is $txn->status, 'done'; + } + ok $conn->exec('SELECT 1'); + + { + my $txn = $conn->txn; + my $st = $txn->q('SELECT count(*) FROM fupg_tst WHERE id = 2'); + { + my $sub = $txn->txn; + is $conn->status, 'txn_idle'; + is $txn->status, 'txn_idle'; + is $sub->status, 'idle'; + + $sub->exec('INSERT INTO fupg_tst VALUES (2)'); + ok !eval { $sub->exec('SELEXT'); 1 }; + + ok !eval { $txn->rollback; 1 }; + like $@, qr/Invalid cross-transaction/; + + is $conn->status, 'txn_error'; + is $txn->status, 'txn_error'; + is $sub->status, 'error'; + } + is $conn->status, 'txn_idle'; + is $txn->status, 'idle'; + is $st->val, 0; + + $st = $txn->q('SELECT count(*) FROM fupg_tst WHERE id = 2'); + { + my $sub = $txn->txn; + $sub->exec('INSERT INTO fupg_tst VALUES (2)'); + $sub->commit; + is $conn->status, 'txn_idle'; + is $txn->status, 'txn_idle'; # No way to tell that it's actually done + is $sub->status, 'done'; + } + is $st->val, 1; + } + is $conn->status, 'idle'; + + { + my $txn = $conn->txn; + my $sub = $txn->txn; + undef $txn; # sub keeps a ref on $txn + is $sub->status, 'idle'; + is $conn->status, 'txn_idle'; + $sub->exec('INSERT INTO fupg_tst VALUES (3)'); + $sub->commit; + } + # We didn't commit $txn, so $sub got aborted as well + is $conn->q('SELECT count(*) FROM fupg_tst WHERE id = 3')->val, 0; +}; + { - my $st = $conn->q("SELECT \$1::int AS a, \$2::char(5) AS \"\x{1F603}\""); + my $st = $conn->q("SELECT 1"); undef $conn; # statement keeps the connection alive - is_deeply $st->params, [ { oid => 23 }, { oid => 1042 } ]; - is_deeply $st->columns, [ - { oid => 23, name => 'a' }, - { oid => 1042, name => "\x{1F603}", typemod => 9 }, - ]; + is $st->val, 1; } done_testing;