pg: Rework txn implementation + statement config API
I liked the Perl implementation of transactions, but managing state between Perl and C is a bit cumbersome, so I've moved the whole thing into C. Also added a few statement configuration methods that currently don't do anything yet.
This commit is contained in:
parent
8f94dd0921
commit
166744dd51
7 changed files with 335 additions and 186 deletions
|
|
@ -1,11 +1,11 @@
|
|||
/* Because I don't know how to use sv_setref_pv() correctly. */
|
||||
|
||||
static SV *fupg_selfobj_(pTHX_ SV **self, void *obj, const char *klass) {
|
||||
static SV *fu_selfobj_(pTHX_ SV **self, void *obj, const char *klass) {
|
||||
*self = newSViv(PTR2IV(obj));
|
||||
return sv_bless(sv_2mortal(newRV_noinc(*self)), gv_stashpv(klass, GV_ADD));
|
||||
}
|
||||
/* Write a blessed SV to obj->self and returns a mortal ref to it */
|
||||
#define fupg_selfobj(obj, klass) fupg_selfobj_(aTHX_ &((obj)->self), obj, klass)
|
||||
#define fu_selfobj(obj, klass) fu_selfobj_(aTHX_ &((obj)->self), obj, klass)
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
257
c/pgconn.c
257
c/pgconn.c
|
|
@ -1,11 +1,55 @@
|
|||
#define FUPG_CACHE 1
|
||||
#define FUPG_TEXT_PARAMS 2
|
||||
#define FUPG_TEXT_RESULTS 4
|
||||
#define FUPG_TEXT (FUPG_TEXT_PARAMS|FUPG_TEXT_RESULTS)
|
||||
|
||||
typedef struct {
|
||||
SV *self;
|
||||
PGconn *conn;
|
||||
UV prep_counter;
|
||||
UV cookie_counter;
|
||||
UV cookie; /* currently active transaction object; 0 = none active */
|
||||
int stflags;
|
||||
} fupg_conn;
|
||||
|
||||
|
||||
struct fupg_txn {
|
||||
SV *self;
|
||||
struct fupg_txn *parent;
|
||||
fupg_conn *conn;
|
||||
UV cookie; /* 0 means done */
|
||||
int stflags;
|
||||
char rollback_cmd[64];
|
||||
};
|
||||
typedef struct fupg_txn fupg_txn;
|
||||
|
||||
typedef struct {
|
||||
/* Set in $conn->q() */
|
||||
SV *self; /* (unused, but whatever) */
|
||||
fupg_conn *conn; /* has a refcnt on conn->self */
|
||||
UV cookie;
|
||||
char *query;
|
||||
SV **bind;
|
||||
int nbind;
|
||||
int stflags;
|
||||
/* Set during prepare */
|
||||
int prepared;
|
||||
char name[32];
|
||||
PGresult *describe;
|
||||
/* Set during execute */
|
||||
int nparam;
|
||||
int nfields;
|
||||
char **param;
|
||||
const fupg_type **recv;
|
||||
void **recvctx;
|
||||
PGresult *result;
|
||||
} fupg_st;
|
||||
|
||||
|
||||
|
||||
|
||||
/* Utilities */
|
||||
|
||||
static SV *fupg_conn_errsv(PGconn *conn, const char *action) {
|
||||
dTHX;
|
||||
HV *hv = newHV();
|
||||
|
|
@ -15,12 +59,14 @@ static SV *fupg_conn_errsv(PGconn *conn, const char *action) {
|
|||
return fu_croak_hv(hv, "FU::PG::error", "FATAL: %s", PQerrorMessage(conn));
|
||||
}
|
||||
|
||||
__attribute__((noreturn))
|
||||
static void fupg_conn_croak(fupg_conn *c, const char *action) {
|
||||
dTHX;
|
||||
croak_sv(fupg_conn_errsv(c->conn, action));
|
||||
}
|
||||
|
||||
/* Takes ownership of the PGresult and croaks. */
|
||||
__attribute__((noreturn))
|
||||
static void fupg_result_croak(PGresult *r, const char *action, const char *query) {
|
||||
dTHX;
|
||||
HV *hv = newHV();
|
||||
|
|
@ -70,40 +116,6 @@ static void fupg_result_croak(PGresult *r, const char *action, const char *query
|
|||
);
|
||||
}
|
||||
|
||||
static SV *fupg_connect(pTHX_ const char *str) {
|
||||
PGconn *conn = PQconnectdb(str);
|
||||
if (PQstatus(conn) != CONNECTION_OK) {
|
||||
SV *sv = fupg_conn_errsv(conn, "connect");
|
||||
PQfinish(conn);
|
||||
croak_sv(sv);
|
||||
}
|
||||
|
||||
fupg_conn *c = safecalloc(1, sizeof(fupg_conn));
|
||||
c->conn = conn;
|
||||
return fupg_selfobj(c, "FU::PG::conn");
|
||||
}
|
||||
|
||||
static const char *fupg_status(fupg_conn *c) {
|
||||
if (PQstatus(c->conn) == CONNECTION_BAD) return "bad";
|
||||
switch (PQtransactionStatus(c->conn)) {
|
||||
case PQTRANS_IDLE: return c->cookie ? "txn_done" : "idle";
|
||||
case PQTRANS_ACTIVE: return "active"; /* can't happen, we don't do async */
|
||||
case PQTRANS_INTRANS: return "txn_idle";
|
||||
case PQTRANS_INERROR: return "txn_error";
|
||||
default: return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
static void fupg_disconnect(fupg_conn *c) {
|
||||
PQfinish(c->conn);
|
||||
c->conn = NULL;
|
||||
}
|
||||
|
||||
static void fupg_destroy(fupg_conn *c) {
|
||||
PQfinish(c->conn);
|
||||
safefree(c);
|
||||
}
|
||||
|
||||
static SV *fupg_exec_result(pTHX_ PGresult *r) {
|
||||
SV *ret = &PL_sv_undef;
|
||||
char *tup = PQcmdTuples(r);
|
||||
|
|
@ -115,6 +127,130 @@ static SV *fupg_exec_result(pTHX_ PGresult *r) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
static void fupg_exec_ok(pTHX_ fupg_conn *c, const char *sql) {
|
||||
PGresult *r = PQexec(c->conn, sql);
|
||||
if (!r) fupg_conn_croak(c, "exec");
|
||||
if (PQresultStatus(r) != PGRES_COMMAND_OK) fupg_result_croak(r, "exec", sql);
|
||||
PQclear(r);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/* Connection & transaction handling */
|
||||
|
||||
static SV *fupg_connect(pTHX_ const char *str) {
|
||||
PGconn *conn = PQconnectdb(str);
|
||||
if (PQstatus(conn) != CONNECTION_OK) {
|
||||
SV *sv = fupg_conn_errsv(conn, "connect");
|
||||
PQfinish(conn);
|
||||
croak_sv(sv);
|
||||
}
|
||||
|
||||
fupg_conn *c = safecalloc(1, sizeof(fupg_conn));
|
||||
c->conn = conn;
|
||||
return fu_selfobj(c, "FU::PG::conn");
|
||||
}
|
||||
|
||||
static const char *fupg_conn_status(fupg_conn *c) {
|
||||
if (PQstatus(c->conn) == CONNECTION_BAD) return "bad";
|
||||
switch (PQtransactionStatus(c->conn)) {
|
||||
case PQTRANS_IDLE: return c->cookie ? "txn_done" : "idle";
|
||||
case PQTRANS_ACTIVE: return "active"; /* can't happen, we don't do async */
|
||||
case PQTRANS_INTRANS: return "txn_idle";
|
||||
case PQTRANS_INERROR: return "txn_error";
|
||||
default: return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
static void fupg_conn_disconnect(fupg_conn *c) {
|
||||
PQfinish(c->conn);
|
||||
c->conn = NULL;
|
||||
}
|
||||
|
||||
static void fupg_conn_destroy(fupg_conn *c) {
|
||||
PQfinish(c->conn);
|
||||
safefree(c);
|
||||
}
|
||||
|
||||
static SV *fupg_conn_txn(pTHX_ fupg_conn *c) {
|
||||
fupg_exec_ok(c, "BEGIN");
|
||||
fupg_txn *t = safecalloc(1, sizeof(fupg_txn));
|
||||
t->conn = c;
|
||||
t->cookie = c->cookie = ++c->cookie_counter;
|
||||
t->stflags = c->stflags;
|
||||
strcpy(t->rollback_cmd, "ROLLBACK");
|
||||
SvREFCNT_inc(c->self);
|
||||
return fu_selfobj(t, "FU::PG::txn");
|
||||
}
|
||||
|
||||
static SV *fupg_txn_txn(pTHX_ fupg_txn *t) {
|
||||
char cmd[64];
|
||||
UV cookie = ++t->conn->cookie_counter;
|
||||
snprintf(cmd, sizeof(cmd), "SAVEPOINT fupg_%"UVuf, cookie);
|
||||
fupg_exec_ok(t->conn, cmd);
|
||||
|
||||
fupg_txn *n = safecalloc(1, sizeof(fupg_txn));
|
||||
n->conn = t->conn;
|
||||
n->parent = t;
|
||||
n->cookie = t->conn->cookie = cookie;
|
||||
n->stflags = t->stflags;
|
||||
snprintf(n->rollback_cmd, sizeof(n->rollback_cmd), "ROLLBACK TO SAVEPOINT fupg_%"UVuf, cookie);
|
||||
SvREFCNT_inc(t->self);
|
||||
return fu_selfobj(n, "FU::PG::txn");
|
||||
}
|
||||
|
||||
static const char *fupg_txn_status(fupg_txn *t) {
|
||||
if (PQstatus(t->conn->conn) == CONNECTION_BAD) return "bad";
|
||||
if (!t->cookie) return "done";
|
||||
int a = t->cookie == t->conn->cookie;
|
||||
switch (PQtransactionStatus(t->conn->conn)) {
|
||||
case PQTRANS_IDLE: return "done";
|
||||
case PQTRANS_ACTIVE: return "active";
|
||||
case PQTRANS_INTRANS: return a ? "idle" : "txn_idle";
|
||||
case PQTRANS_INERROR: return a ? "error" : "txn_error";
|
||||
default: return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
static void fupg_txn_commit(pTHX_ fupg_txn *t) {
|
||||
char cmd[64];
|
||||
if (t->parent) snprintf(cmd, sizeof(cmd), "RELEASE SAVEPOINT fupg_%"UVuf, t->cookie);
|
||||
else strcpy(cmd, "COMMIT");
|
||||
t->cookie = 0;
|
||||
fupg_exec_ok(t->conn, cmd);
|
||||
}
|
||||
|
||||
static void fupg_txn_rollback(pTHX_ fupg_txn *t) {
|
||||
t->cookie = 0;
|
||||
fupg_exec_ok(t->conn, t->rollback_cmd);
|
||||
}
|
||||
|
||||
static void fupg_txn_destroy(pTHX_ fupg_txn *t) {
|
||||
if (t->cookie) {
|
||||
PGresult *r = PQexec(t->conn->conn, t->rollback_cmd);
|
||||
/* Can't really throw an error in DESTROY. If a rollback command fails,
|
||||
* we're sufficiently screwed that the only sensible recourse is to
|
||||
* disconnect and let any further operations throw an error. */
|
||||
if (!r || PQresultStatus(r) != PGRES_COMMAND_OK)
|
||||
fupg_conn_disconnect(t->conn);
|
||||
PQclear(r);
|
||||
}
|
||||
if (t->parent) {
|
||||
t->conn->cookie = t->parent->cookie;
|
||||
SvREFCNT_dec(t->parent->self);
|
||||
} else {
|
||||
t->conn->cookie = 0;
|
||||
SvREFCNT_dec(t->conn->self);
|
||||
}
|
||||
safefree(t);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/* Querying */
|
||||
|
||||
static SV *fupg_exec(pTHX_ fupg_conn *c, const char *sql) {
|
||||
PGresult *r = PQexec(c->conn, sql);
|
||||
if (!r) fupg_conn_croak(c, "exec");
|
||||
|
|
@ -129,54 +265,11 @@ static SV *fupg_exec(pTHX_ fupg_conn *c, const char *sql) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
/* Validate a FU::PG::txn object and extract the connection */
|
||||
static fupg_conn *fupg_get_transaction(pTHX_ SV *sv) {
|
||||
if (!sv_derived_from(sv, "FU::PG::txn")) goto invalid;
|
||||
sv = SvRV(sv);
|
||||
if (SvTYPE(sv) != SVt_PVAV) goto invalid;
|
||||
AV *av = (AV *)sv;
|
||||
|
||||
SV **v = av_fetch(av, 0, 0);
|
||||
if (!v || !*v) goto invalid;
|
||||
fupg_conn *c = (fupg_conn *)SvIVX(SvRV(*v));
|
||||
|
||||
v = av_fetch(av, 1, 0);
|
||||
if (!v || !*v) goto invalid;
|
||||
if (!SvOK(*v)) fu_confess("Invalid attempt to run a query on a transaction that has already finished");
|
||||
if (c->cookie != SvUV(*v)) fu_confess("Invalid cross-transaction operation");
|
||||
return c;
|
||||
invalid:
|
||||
fu_confess("invalid transaction object");
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
/* Set in $conn->q() */
|
||||
SV *self;
|
||||
fupg_conn *conn; /* has a refcnt on conn->self */
|
||||
UV cookie;
|
||||
char *query;
|
||||
SV **bind;
|
||||
int nbind;
|
||||
bool text_params;
|
||||
bool text_results;
|
||||
/* Set during prepare */
|
||||
int prepared;
|
||||
char name[32];
|
||||
PGresult *describe;
|
||||
/* Set during execute */
|
||||
int nparam;
|
||||
int nfields;
|
||||
char **param;
|
||||
const fupg_type **recv;
|
||||
void **recvctx;
|
||||
PGresult *result;
|
||||
} fupg_st;
|
||||
|
||||
static SV *fupg_q(pTHX_ fupg_conn *c, const char *query, I32 ax, I32 argc) {
|
||||
static SV *fupg_q(pTHX_ fupg_conn *c, int stflags, const char *query, I32 ax, I32 argc) {
|
||||
fupg_st *st = safecalloc(1, sizeof(fupg_st));
|
||||
st->conn = c;
|
||||
st->cookie = c->cookie;
|
||||
st->text_params = st->text_results = true; /* TODO: default to false */
|
||||
st->stflags = stflags;
|
||||
SvREFCNT_inc(c->self);
|
||||
|
||||
st->query = savepv(query);
|
||||
|
|
@ -190,7 +283,7 @@ static SV *fupg_q(pTHX_ fupg_conn *c, const char *query, I32 ax, I32 argc) {
|
|||
}
|
||||
}
|
||||
|
||||
return fupg_selfobj(st, "FU::PG::st");
|
||||
return fu_selfobj(st, "FU::PG::st");
|
||||
}
|
||||
|
||||
static void fupg_st_prepare(pTHX_ fupg_st *st) {
|
||||
|
|
@ -287,7 +380,7 @@ static void fupg_st_execute(pTHX_ fupg_st *st) {
|
|||
* improvement */
|
||||
PGresult *r = PQexecPrepared(st->conn->conn,
|
||||
st->name, st->nparam, (const char * const*)st->param,
|
||||
NULL, NULL, st->text_results ? 0 : 1);
|
||||
NULL, NULL, st->stflags & FUPG_TEXT_RESULTS ? 0 : 1);
|
||||
if (!r) fupg_conn_croak(st->conn , "exec");
|
||||
switch (PQresultStatus(r)) {
|
||||
case PGRES_COMMAND_OK:
|
||||
|
|
@ -300,7 +393,7 @@ static void fupg_st_execute(pTHX_ fupg_st *st) {
|
|||
st->recv = safecalloc(st->nfields, sizeof(*st->recv));
|
||||
st->recvctx = safecalloc(st->nfields, sizeof(*st->recvctx));
|
||||
for (i=0; i<st->nfields; i++) {
|
||||
st->recv[i] = fupg_type_lookup(st->text_results ? 0 : PQftype(r, i));
|
||||
st->recv[i] = fupg_type_lookup(st->stflags & FUPG_TEXT_RESULTS ? 0 : PQftype(r, i));
|
||||
if (!st->recv[i])
|
||||
fu_confess("Unable to receive query results of type %u", PQftype(r, i));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,9 @@
|
|||
typedef void (*fupg_send_fn)(pTHX_ SV *, fustr *, void *);
|
||||
|
||||
/* Receive function, takes a binary string and should return a Perl value.
|
||||
* libpq guarantees that the given buffer is aligned to MAXIMUM_ALIGNOF. */
|
||||
* libpq guarantees that the given buffer is aligned to MAXIMUM_ALIGNOF.
|
||||
* For fixed-length types, the recv function is only called after verifying
|
||||
* that the input buffer has the correct length. */
|
||||
typedef SV *(*fupg_recv_fn)(pTHX_ const char *, int, void *);
|
||||
|
||||
typedef struct {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue