pg: Add transaction & subtransaction support
Was expecting the implementation of this to get overly complicated and brittle, but using a counter-based cookie and doing parts of it in Perl made it pretty easy actually. Pretty happy with how this turned out so far. TODO: documentation -.-
This commit is contained in:
parent
9d5905e3b4
commit
171afc0268
5 changed files with 276 additions and 10 deletions
47
FU.xs
47
FU.xs
|
|
@ -14,6 +14,15 @@
|
|||
#include "c/pgconn.c"
|
||||
|
||||
|
||||
#define FUPG_CONN_COOKIE \
|
||||
if (c->cookie) croak("Invalid attempt to run a query on the top-level connection while a transaction object exists")
|
||||
|
||||
#define FUPG_ST_COOKIE \
|
||||
if (st->cookie != st->conn->cookie) croak("Invalid cross-transaction operation on statement object")
|
||||
|
||||
typedef fupg_conn *fupg_txn;
|
||||
|
||||
|
||||
MODULE = FU
|
||||
|
||||
PROTOTYPES: DISABLE
|
||||
|
|
@ -22,6 +31,7 @@ PROTOTYPES: DISABLE
|
|||
TYPEMAP: <<EOT
|
||||
TYPEMAP
|
||||
fupg_conn * FUPG_CONN
|
||||
fupg_txn FUPG_TXN
|
||||
fupg_st * FUPG_ST
|
||||
|
||||
INPUT
|
||||
|
|
@ -29,6 +39,9 @@ FUPG_CONN
|
|||
if (sv_derived_from($arg, \"FU::PG::conn\")) $var = (fupg_conn *)SvIVX(SvRV($arg));
|
||||
else croak(\"invalid connection object\");
|
||||
|
||||
FUPG_TXN
|
||||
$var = fupg_get_transaction(aTHX_ $arg);
|
||||
|
||||
FUPG_ST
|
||||
if (sv_derived_from($arg, \"FU::PG::st\")) $var = (fupg_st *)SvIVX(SvRV($arg));
|
||||
else croak(\"invalid statement object\");
|
||||
|
|
@ -80,12 +93,28 @@ void _debug_trace(fupg_conn *c, bool on)
|
|||
else PQuntrace(c->conn);
|
||||
ST(0) = c->self;
|
||||
|
||||
void _set_cookie(fupg_conn *c, UV cookie)
|
||||
CODE:
|
||||
c->cookie = cookie;
|
||||
|
||||
UV _get_cookie(fupg_conn *c)
|
||||
CODE:
|
||||
RETVAL = c->cookie;
|
||||
OUTPUT:
|
||||
RETVAL
|
||||
|
||||
void status(fupg_conn *c)
|
||||
CODE:
|
||||
ST(0) = sv_2mortal(newSVpv(fupg_status(c), 0));
|
||||
|
||||
void exec(fupg_conn *c, SV *sv)
|
||||
CODE:
|
||||
FUPG_CONN_COOKIE;
|
||||
ST(0) = fupg_exec(aTHX_ c, SvPVutf8_nolen(sv));
|
||||
|
||||
void q(fupg_conn *c, SV *sv, ...)
|
||||
CODE:
|
||||
FUPG_CONN_COOKIE;
|
||||
ST(0) = fupg_q(aTHX_ c, SvPVutf8_nolen(sv), ax, items);
|
||||
|
||||
void DESTROY(fupg_conn *c)
|
||||
|
|
@ -93,34 +122,52 @@ void DESTROY(fupg_conn *c)
|
|||
fupg_destroy(c);
|
||||
|
||||
|
||||
MODULE = FU PACKAGE = FU::PG::txn
|
||||
|
||||
void exec(fupg_txn c, SV *sv)
|
||||
CODE:
|
||||
ST(0) = fupg_exec(aTHX_ c, SvPVutf8_nolen(sv));
|
||||
|
||||
void q(fupg_txn c, SV *sv, ...)
|
||||
CODE:
|
||||
ST(0) = fupg_q(aTHX_ c, SvPVutf8_nolen(sv), ax, items);
|
||||
|
||||
|
||||
MODULE = FU PACKAGE = FU::PG::st
|
||||
|
||||
void params(fupg_st *st)
|
||||
CODE:
|
||||
FUPG_ST_COOKIE;
|
||||
ST(0) = fupg_st_params(aTHX_ st);
|
||||
|
||||
void columns(fupg_st *st)
|
||||
CODE:
|
||||
FUPG_ST_COOKIE;
|
||||
ST(0) = fupg_st_columns(aTHX_ st);
|
||||
|
||||
void exec(fupg_st *st)
|
||||
CODE:
|
||||
FUPG_ST_COOKIE;
|
||||
ST(0) = fupg_st_exec(aTHX_ st);
|
||||
|
||||
void val(fupg_st *st)
|
||||
CODE:
|
||||
FUPG_ST_COOKIE;
|
||||
ST(0) = fupg_st_val(aTHX_ st);
|
||||
|
||||
void rowl(fupg_st *st)
|
||||
CODE:
|
||||
FUPG_ST_COOKIE;
|
||||
XSRETURN(fupg_st_rowl(aTHX_ st, ax));
|
||||
|
||||
void rowa(fupg_st *st)
|
||||
CODE:
|
||||
FUPG_ST_COOKIE;
|
||||
ST(0) = fupg_st_rowa(aTHX_ st);
|
||||
|
||||
void rowh(fupg_st *st)
|
||||
CODE:
|
||||
FUPG_ST_COOKIE;
|
||||
ST(0) = fupg_st_rowh(aTHX_ st);
|
||||
|
||||
void DESTROY(fupg_st *st)
|
||||
|
|
|
|||
50
FU/PG.pm
50
FU/PG.pm
|
|
@ -6,8 +6,58 @@ _load_libpq();
|
|||
|
||||
package FU::PG::conn {
|
||||
sub lib_version { FU::PG::lib_version() }
|
||||
|
||||
sub txn($c) {
|
||||
$c->exec('BEGIN');
|
||||
$c->_set_cookie(++$FU::PG::txn::COUNTER);
|
||||
bless [$c, $FU::PG::txn::COUNTER, undef], 'FU::PG::txn';
|
||||
}
|
||||
};
|
||||
|
||||
package FU::PG::txn {
|
||||
use Carp 'croak';
|
||||
|
||||
my $COUNTER = 0;
|
||||
|
||||
# Arrayref:
|
||||
# 0: $conn
|
||||
# 1: $cookie, a snapshot of $COUNTER that identifies this transaction, used
|
||||
# to match commands against transactions. Set to undef when this
|
||||
# transaction is 'done' but the object is still alive.
|
||||
# 2: $parent, undef if this is a top-level transaction.
|
||||
|
||||
sub commit($t) {
|
||||
croak "Unable to commit transaction that has already finished" if !$t->[1];
|
||||
$t->exec($t->[2] ? "RELEASE SAVEPOINT fupg_$t->[1]" : 'COMMIT');
|
||||
$t->[1] = undef;
|
||||
}
|
||||
|
||||
sub rollback($t) {
|
||||
croak "Unable to rollback transaction that has already finished" if !$t->[1];
|
||||
$t->exec($t->[2] ? "ROLLBACK TO SAVEPOINT fupg_$t->[1]" : 'ROLLBACK');
|
||||
$t->[1] = undef;
|
||||
}
|
||||
|
||||
sub txn($t) {
|
||||
croak "Unable to create sub-transaction when current transaction has already finished" if !$t->[1];
|
||||
$COUNTER++;
|
||||
$t->exec("SAVEPOINT fupg_$COUNTER");
|
||||
$t->[0]->_set_cookie($COUNTER);
|
||||
bless [$t->[0], $COUNTER, $t], 'FU::PG::txn';
|
||||
}
|
||||
|
||||
sub status($t) {
|
||||
my $cs = $t->[0]->status;
|
||||
return $cs if $cs eq 'bad' || ($t->[1] && $t->[0]->_get_cookie != $t->[1]);
|
||||
return $cs eq 'txn_error' ? 'error' : $t->[1] ? 'idle' : 'done';
|
||||
}
|
||||
|
||||
sub DESTROY($t) {
|
||||
$t->rollback if $t->[1];
|
||||
$t->[0]->_set_cookie($t->[2] ? $t->[2][1] : 0);
|
||||
}
|
||||
}
|
||||
|
||||
package FU::PG::error {
|
||||
use overload '""' => sub($e, @) { $e->{full_message} };
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,6 +13,8 @@ typedef enum {
|
|||
} ExecStatusType;
|
||||
typedef enum { PQERRORS_TERSE, PQERRORS_DEFAULT, PQERRORS_VERBOSE, PQERRORS_SQLSTATE } PGVerbosity;
|
||||
typedef enum { PQSHOW_CONTEXT_NEVER, PQSHOW_CONTEXT_ERRORS, PQSHOW_CONTEXT_ALWAYS } PGContextVisibility;
|
||||
typedef enum { CONNECTION_OK, CONNECTION_BAD } ConnStatusType; /* There's more, but they're irrelevant to us */
|
||||
typedef enum { PQTRANS_IDLE, PQTRANS_ACTIVE, PQTRANS_INTRANS, PQTRANS_INERROR, PQTRANS_UNKNOWN } PGTransactionStatusType;
|
||||
|
||||
#define PG_DIAG_SEVERITY 'S'
|
||||
#define PG_DIAG_SEVERITY_NONLOCALIZED 'V'
|
||||
|
|
@ -62,8 +64,9 @@ typedef enum { PQSHOW_CONTEXT_NEVER, PQSHOW_CONTEXT_ERRORS, PQSHOW_CONTEXT_ALWAY
|
|||
X(PQresultStatus, ExecStatusType, const PGresult *) \
|
||||
X(PQresultVerboseErrorMessage, char *, const PGresult *, PGVerbosity, PGContextVisibility) \
|
||||
X(PQserverVersion, int, const PGconn *) \
|
||||
X(PQstatus, int, const PGconn *) \
|
||||
X(PQstatus, ConnStatusType, const PGconn *) \
|
||||
X(PQtrace, void, PGconn *, FILE *) \
|
||||
X(PQtransactionStatus, PGTransactionStatusType, const PGconn *) \
|
||||
X(PQuntrace, void, PGconn *)
|
||||
|
||||
#define X(n, r, ...) static r (*n)(__VA_ARGS__);
|
||||
|
|
|
|||
38
c/pgconn.c
38
c/pgconn.c
|
|
@ -2,6 +2,7 @@ typedef struct {
|
|||
SV *self;
|
||||
PGconn *conn;
|
||||
UV prep_counter;
|
||||
UV cookie; /* currently active transaction object; 0 = none active */
|
||||
} fupg_conn;
|
||||
|
||||
|
||||
|
|
@ -71,7 +72,7 @@ static void fupg_result_croak(PGresult *r, const char *action, const char *query
|
|||
|
||||
static SV *fupg_connect(pTHX_ const char *str) {
|
||||
PGconn *conn = PQconnectdb(str);
|
||||
if (PQstatus(conn) != 0) {
|
||||
if (PQstatus(conn) != CONNECTION_OK) {
|
||||
SV *sv = fupg_conn_errsv(conn, "connect");
|
||||
PQfinish(conn);
|
||||
croak_sv(sv);
|
||||
|
|
@ -82,6 +83,17 @@ static SV *fupg_connect(pTHX_ const char *str) {
|
|||
return fupg_selfobj(c, "FU::PG::conn");
|
||||
}
|
||||
|
||||
static const char *fupg_status(fupg_conn *c) {
|
||||
if (PQstatus(c->conn) == CONNECTION_BAD) return "bad";
|
||||
switch (PQtransactionStatus(c->conn)) {
|
||||
case PQTRANS_IDLE: return c->cookie ? "txn_done" : "idle";
|
||||
case PQTRANS_ACTIVE: return "active"; /* can't happen, we don't do async */
|
||||
case PQTRANS_INTRANS: return "txn_idle";
|
||||
case PQTRANS_INERROR: return "txn_error";
|
||||
default: return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
static void fupg_destroy(fupg_conn *c) {
|
||||
PQfinish(c->conn);
|
||||
safefree(c);
|
||||
|
|
@ -112,6 +124,26 @@ static SV *fupg_exec(pTHX_ fupg_conn *c, const char *sql) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
/* Validate a FU::PG::txn object and extract the connection */
|
||||
static fupg_conn *fupg_get_transaction(pTHX_ SV *sv) {
|
||||
if (!sv_derived_from(sv, "FU::PG::txn")) goto invalid;
|
||||
sv = SvRV(sv);
|
||||
if (SvTYPE(sv) != SVt_PVAV) goto invalid;
|
||||
AV *av = (AV *)sv;
|
||||
|
||||
SV **v = av_fetch(av, 0, 0);
|
||||
if (!v || !*v) goto invalid;
|
||||
fupg_conn *c = (fupg_conn *)SvIVX(SvRV(*v));
|
||||
|
||||
v = av_fetch(av, 1, 0);
|
||||
if (!v || !*v) goto invalid;
|
||||
if (!SvOK(*v)) croak("Invalid attempt to run a query on a transaction that has already finished");
|
||||
if (c->cookie != SvUV(*v)) croak("Invalid cross-transaction operation");
|
||||
return c;
|
||||
invalid:
|
||||
croak("invalid transaction object");
|
||||
}
|
||||
|
||||
/* Read a Perl value from a PGresult.
|
||||
* Currently assumes text format and just creates a PV. */
|
||||
static SV *fupg_val(pTHX_ const PGresult *r, int row, int col) {
|
||||
|
|
@ -123,6 +155,7 @@ typedef struct {
|
|||
/* Set in $conn->q() */
|
||||
SV *self;
|
||||
fupg_conn *conn; /* has a refcnt on conn->self */
|
||||
UV cookie;
|
||||
char *query;
|
||||
SV **bind;
|
||||
int bindn;
|
||||
|
|
@ -139,6 +172,7 @@ typedef struct {
|
|||
static SV *fupg_q(pTHX_ fupg_conn *c, const char *query, I32 ax, I32 argc) {
|
||||
fupg_st *st = safecalloc(1, sizeof(fupg_st));
|
||||
st->conn = c;
|
||||
st->cookie = c->cookie;
|
||||
SvREFCNT_inc(c->self);
|
||||
|
||||
st->query = savepv(query);
|
||||
|
|
@ -249,7 +283,6 @@ static void fupg_st_execute(pTHX_ fupg_st *st) {
|
|||
PGresult *r = PQexecPrepared(st->conn->conn, st->name, st->paramn, (const char * const*)st->param, NULL, NULL, 0);
|
||||
if (!r) fupg_conn_croak(st->conn , "exec");
|
||||
switch (PQresultStatus(r)) {
|
||||
case PGRES_EMPTY_QUERY:
|
||||
case PGRES_COMMAND_OK:
|
||||
case PGRES_TUPLES_OK: break;
|
||||
default: fupg_result_croak(r, "exec", st->query);
|
||||
|
|
@ -334,5 +367,4 @@ static void fupg_st_destroy(fupg_st *st) {
|
|||
|
||||
/* TODO: $st->alla, allh, flat, kvv, kva, kvh */
|
||||
/* TODO: Prepared statement caching */
|
||||
/* TODO: Transactions */
|
||||
/* TODO: Binary format fetching & type handling */
|
||||
|
|
|
|||
146
t/pgconnect.t
146
t/pgconnect.t
|
|
@ -2,6 +2,7 @@ use v5.36;
|
|||
use Test::More;
|
||||
|
||||
plan skip_all => $@ if !eval { require FU::PG; } && $@ =~ /Unable to load libpq/;
|
||||
die $@ if $@;
|
||||
plan skip_all => 'Please set FU_TEST_DB to a PostgreSQL connection string to run these tests' if !$ENV{FU_TEST_DB};
|
||||
|
||||
sub okerr($sev, $act, $msg) {
|
||||
|
|
@ -22,6 +23,7 @@ $conn->_debug_trace(0);
|
|||
is ref $conn, 'FU::PG::conn';
|
||||
ok $conn->server_version > 100000;
|
||||
is $conn->lib_version, FU::PG::lib_version();
|
||||
is $conn->status, 'idle';
|
||||
|
||||
subtest '$conn->exec', sub {
|
||||
ok !eval { $conn->exec('COPY (SELECT 1) TO STDOUT'); };
|
||||
|
|
@ -35,6 +37,8 @@ subtest '$conn->exec', sub {
|
|||
|
||||
ok !eval { $conn->q('SELEXT')->params; };
|
||||
okerr ERROR => prepare => qr/syntax error/;
|
||||
|
||||
is $conn->exec('SET client_encoding=utf8'), undef;
|
||||
};
|
||||
|
||||
|
||||
|
|
@ -45,6 +49,18 @@ subtest '$st prepare & exec', sub {
|
|||
is_deeply $st->columns, [{ name => '?column?', oid => 23 }];
|
||||
is $conn->exec('SELECT 1 FROM pg_prepared_statements'), 1;
|
||||
is $st->exec, 1;
|
||||
ok !eval { $st->exec; 1 };
|
||||
like $@, qr/Invalid attempt to execute statement multiple times/;
|
||||
}
|
||||
|
||||
{
|
||||
my $st = $conn->q("SELECT \$1::int AS a, \$2::char(5) AS \"\x{1F603}\"", 1, 2);
|
||||
is_deeply $st->params, [ { oid => 23 }, { oid => 1042 } ];
|
||||
is_deeply $st->columns, [
|
||||
{ oid => 23, name => 'a' },
|
||||
{ oid => 1042, name => "\x{1F603}", typemod => 9 },
|
||||
];
|
||||
is $st->exec, 1;
|
||||
}
|
||||
|
||||
is $conn->exec('SELECT 1 FROM pg_prepared_statements'), 0;
|
||||
|
|
@ -54,6 +70,15 @@ subtest '$st prepare & exec', sub {
|
|||
|
||||
ok !eval { $conn->q('SELECT $1')->exec; 1 };
|
||||
okerr ERROR => exec => qr/bind message supplies 0 parameters, but prepared statement/;
|
||||
|
||||
# prepare + describe won't let us detect empty queries, hmm...
|
||||
is_deeply $conn->q('')->params, [];
|
||||
is_deeply $conn->q('')->columns, [];
|
||||
|
||||
ok !eval { $conn->q('')->exec; 1 };
|
||||
okerr FATAL => exec => qr/unexpected status code/;
|
||||
|
||||
is $conn->q('SET client_encoding=utf8')->exec, undef;
|
||||
};
|
||||
|
||||
subtest '$st->val', sub {
|
||||
|
|
@ -112,14 +137,123 @@ subtest '$st->rowh', sub {
|
|||
is_deeply $conn->q('SELECT 1 as a, 2 as b')->rowh, {a => 1, b => 2};
|
||||
};
|
||||
|
||||
subtest 'txn', sub {
|
||||
$conn->exec('CREATE TEMPORARY TABLE fupg_tst (id int)');
|
||||
$conn->txn->exec('INSERT INTO fupg_tst VALUES (1)'); # rolled back
|
||||
is $conn->q('SELECT COUNT(*) FROM fupg_tst')->val, 0;
|
||||
|
||||
my $st = $conn->q('SELECT COUNT(*) FROM fupg_tst');
|
||||
my $sst;
|
||||
{
|
||||
my $txn = $conn->txn;
|
||||
is $conn->status, 'txn_idle';
|
||||
is $txn->status, 'idle';
|
||||
|
||||
ok !eval { $st->exec; 1 };
|
||||
like $@, qr/Invalid cross-transaction/;
|
||||
|
||||
ok !eval { $conn->exec('SELECT 1'); 1 };
|
||||
like $@, qr/Invalid attempt to run a query/;
|
||||
ok !eval { $conn->q('SELECT 1'); 1 };
|
||||
like $@, qr/Invalid attempt to run a query/;
|
||||
ok !eval { $conn->txn; 1 };
|
||||
like $@, qr/Invalid attempt to run a query/;
|
||||
|
||||
$txn->exec('INSERT INTO fupg_tst VALUES (1)');
|
||||
$sst = $txn->q('SELECT 1');
|
||||
|
||||
is $conn->status, 'txn_idle';
|
||||
is $txn->status, 'idle';
|
||||
$txn->commit;
|
||||
is $conn->status, 'txn_done';
|
||||
is $txn->status, 'done';
|
||||
|
||||
ok !eval { $txn->rollback; 1 };
|
||||
like $@, qr/Unable to rollback/;
|
||||
ok !eval { $txn->commit; 1 };
|
||||
like $@, qr/Unable to commit/;
|
||||
ok !eval { $txn->txn; 1 };
|
||||
like $@, qr/Unable to create/;
|
||||
ok !eval { $txn->exec('select 1'); 1 };
|
||||
like $@, qr/Invalid attempt to run a query/;
|
||||
ok !eval { $txn->q('select 1'); 1 };
|
||||
like $@, qr/Invalid attempt to run a query/;
|
||||
|
||||
ok !eval { $conn->exec('SELECT 1'); 1 };
|
||||
like $@, qr/Invalid attempt to run a query/;
|
||||
}
|
||||
is $conn->status, 'idle';
|
||||
is $st->val, 1;
|
||||
ok !eval { $sst->exec; 1 };
|
||||
like $@, qr/Invalid cross-transaction/;
|
||||
|
||||
{
|
||||
my $txn = $conn->txn;
|
||||
ok !eval { $txn->exec('SELEXT'); 1 }; # puts txn in error state
|
||||
is $conn->status, 'txn_error';
|
||||
is $txn->status, 'error';
|
||||
ok !eval { $txn->exec('SELECT 1'); 1 };
|
||||
like $@, qr/current transaction is aborted/;
|
||||
|
||||
$txn->rollback;
|
||||
is $conn->status, 'txn_done';
|
||||
is $txn->status, 'done';
|
||||
}
|
||||
ok $conn->exec('SELECT 1');
|
||||
|
||||
{
|
||||
my $txn = $conn->txn;
|
||||
my $st = $txn->q('SELECT count(*) FROM fupg_tst WHERE id = 2');
|
||||
{
|
||||
my $sub = $txn->txn;
|
||||
is $conn->status, 'txn_idle';
|
||||
is $txn->status, 'txn_idle';
|
||||
is $sub->status, 'idle';
|
||||
|
||||
$sub->exec('INSERT INTO fupg_tst VALUES (2)');
|
||||
ok !eval { $sub->exec('SELEXT'); 1 };
|
||||
|
||||
ok !eval { $txn->rollback; 1 };
|
||||
like $@, qr/Invalid cross-transaction/;
|
||||
|
||||
is $conn->status, 'txn_error';
|
||||
is $txn->status, 'txn_error';
|
||||
is $sub->status, 'error';
|
||||
}
|
||||
is $conn->status, 'txn_idle';
|
||||
is $txn->status, 'idle';
|
||||
is $st->val, 0;
|
||||
|
||||
$st = $txn->q('SELECT count(*) FROM fupg_tst WHERE id = 2');
|
||||
{
|
||||
my $sub = $txn->txn;
|
||||
$sub->exec('INSERT INTO fupg_tst VALUES (2)');
|
||||
$sub->commit;
|
||||
is $conn->status, 'txn_idle';
|
||||
is $txn->status, 'txn_idle'; # No way to tell that it's actually done
|
||||
is $sub->status, 'done';
|
||||
}
|
||||
is $st->val, 1;
|
||||
}
|
||||
is $conn->status, 'idle';
|
||||
|
||||
{
|
||||
my $txn = $conn->txn;
|
||||
my $sub = $txn->txn;
|
||||
undef $txn; # sub keeps a ref on $txn
|
||||
is $sub->status, 'idle';
|
||||
is $conn->status, 'txn_idle';
|
||||
$sub->exec('INSERT INTO fupg_tst VALUES (3)');
|
||||
$sub->commit;
|
||||
}
|
||||
# We didn't commit $txn, so $sub got aborted as well
|
||||
is $conn->q('SELECT count(*) FROM fupg_tst WHERE id = 3')->val, 0;
|
||||
};
|
||||
|
||||
{
|
||||
my $st = $conn->q("SELECT \$1::int AS a, \$2::char(5) AS \"\x{1F603}\"");
|
||||
my $st = $conn->q("SELECT 1");
|
||||
undef $conn; # statement keeps the connection alive
|
||||
is_deeply $st->params, [ { oid => 23 }, { oid => 1042 } ];
|
||||
is_deeply $st->columns, [
|
||||
{ oid => 23, name => 'a' },
|
||||
{ oid => 1042, name => "\x{1F603}", typemod => 9 },
|
||||
];
|
||||
is $st->val, 1;
|
||||
}
|
||||
|
||||
done_testing;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue