fu/c/pgconn.c

198 lines
7.2 KiB
C

typedef struct {
SV *self;
PGconn *conn;
UV prep_counter;
} fupg_conn;
static SV *fupg_conn_errsv(PGconn *conn, const char *action) {
dTHX;
HV *hv = newHV();
hv_stores(hv, "action", newSVpv(action, 0));
hv_stores(hv, "severity", newSVpvs("FATAL")); /* Connection-related errors are always fatal */
hv_stores(hv, "message", newSVpv(PQerrorMessage(conn), 0));
return sv_bless(sv_2mortal(newRV_noinc((SV *)hv)), gv_stashpvs("FU::PG::error", GV_ADD));
}
static void fupg_conn_croak(fupg_conn *c, const char *action) {
dTHX;
croak_sv(fupg_conn_errsv(c->conn, action));
}
/* Takes ownership of the PGresult and croaks. */
static void fupg_result_croak(PGresult *r, const char *action, const char *query) {
dTHX;
HV *hv = newHV();
char *s;
hv_stores(hv, "action", newSVpv(action, 0));
s = PQresultErrorField(r, PG_DIAG_SEVERITY_NONLOCALIZED);
hv_stores(hv, "severity", newSVpv(s ? s : "FATAL", 0));
if (query) hv_stores(hv, "query", newSVpv(s, 0));
/* If the PGresult is not an error, assume it's an unexpected resultStatus */
s = PQresultErrorField(r, PG_DIAG_MESSAGE_PRIMARY);
hv_stores(hv, "message", s ? newSVpv(s, 0) : newSVpvf("unexpected status code '%s'", PQresStatus(PQresultStatus(r))));
/* I like the verbose error messages. Doesn't include anything that's not
* also fetched below, but saves me from having to do the formatting
* manually. */
if (s) {
s = PQresultVerboseErrorMessage(r, 2 /* PQERRORS_VERBOSE */, 1 /* PQSHOW_CONTEXT_ERRORS */);
if (s) {
hv_stores(hv, "verbose_message", newSVpv(s, 0));
PQfreemem(s);
}
}
if ((s = PQresultErrorField(r, PG_DIAG_MESSAGE_DETAIL))) hv_stores(hv, "detail", newSVpv(s, 0));
if ((s = PQresultErrorField(r, PG_DIAG_MESSAGE_HINT))) hv_stores(hv, "hint", newSVpv(s, 0));
if ((s = PQresultErrorField(r, PG_DIAG_STATEMENT_POSITION))) hv_stores(hv, "statement_position", newSVpv(s, 0));
if ((s = PQresultErrorField(r, PG_DIAG_INTERNAL_POSITION))) hv_stores(hv, "internal_position", newSVpv(s, 0));
if ((s = PQresultErrorField(r, PG_DIAG_INTERNAL_QUERY))) hv_stores(hv, "internal_query", newSVpv(s, 0));
if ((s = PQresultErrorField(r, PG_DIAG_CONTEXT))) hv_stores(hv, "context", newSVpv(s, 0));
if ((s = PQresultErrorField(r, PG_DIAG_SCHEMA_NAME))) hv_stores(hv, "schema_name", newSVpv(s, 0));
if ((s = PQresultErrorField(r, PG_DIAG_TABLE_NAME))) hv_stores(hv, "table_name", newSVpv(s, 0));
if ((s = PQresultErrorField(r, PG_DIAG_COLUMN_NAME))) hv_stores(hv, "column_name", newSVpv(s, 0));
if ((s = PQresultErrorField(r, PG_DIAG_DATATYPE_NAME))) hv_stores(hv, "datatype_name", newSVpv(s, 0));
if ((s = PQresultErrorField(r, PG_DIAG_CONSTRAINT_NAME))) hv_stores(hv, "constraint_name", newSVpv(s, 0));
if ((s = PQresultErrorField(r, PG_DIAG_SOURCE_FILE))) hv_stores(hv, "source_file", newSVpv(s, 0));
if ((s = PQresultErrorField(r, PG_DIAG_SOURCE_LINE))) hv_stores(hv, "source_line", newSVpv(s, 0));
if ((s = PQresultErrorField(r, PG_DIAG_SOURCE_FUNCTION))) hv_stores(hv, "source_function", newSVpv(s, 0));
PQclear(r);
croak_sv(sv_bless(sv_2mortal(newRV_noinc((SV *)hv)), gv_stashpvs("FU::PG::error", GV_ADD)));
}
static SV *fupg_connect(pTHX_ const char *str) {
PGconn *conn = PQconnectdb(str);
if (PQstatus(conn) != 0) {
SV *sv = fupg_conn_errsv(conn, "connect");
PQfinish(conn);
croak_sv(sv);
}
fupg_conn *c = safecalloc(1, sizeof(fupg_conn));
c->conn = conn;
return fupg_selfobj(c, "FU::PG::conn");
}
static void fupg_destroy(fupg_conn *c) {
PQfinish(c->conn);
safefree(c);
}
static SV *fupg_exec(pTHX_ fupg_conn *c, const char *sql) {
PGresult *r = PQexec(c->conn, sql);
if (!r) fupg_conn_croak(c, "exec");
switch (PQresultStatus(r)) {
case PGRES_EMPTY_QUERY:
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK: break;
default: fupg_result_croak(r, "exec", sql);
}
SV *ret = &PL_sv_undef;
char *tup = PQcmdTuples(r);
if (tup && *tup) {
ret = sv_2mortal(newSVpv(tup, 0));
SvIV(ret);
SvIOK_only(ret);
}
PQclear(r);
return ret;
}
typedef struct {
/* Set in $conn->q() */
SV *self;
fupg_conn *conn; /* has a refcnt on conn->self */
char *query;
SV **bind;
int bindn;
/* Set during prepare */
char name[32];
int prepared;
PGresult *describe;
} fupg_st;
static SV *fupg_q(pTHX_ fupg_conn *c, const char *query, I32 ax, I32 argc) {
fupg_st *st = safecalloc(1, sizeof(fupg_st));
st->conn = c;
SvREFCNT_inc(c->self);
st->query = savepv(query);
if (argc > 2) {
st->bind = safemalloc((argc-2) * sizeof(SV *));
I32 i = 2;
while (i < argc) {
st->bind[st->bindn] = newSV(0);
sv_setsv(st->bind[st->bindn], ST(i));
st->bindn++;
}
}
return fupg_selfobj(st, "FU::PG::st");
}
static void fupg_st_prepare(pTHX_ fupg_st *st) {
if (st->describe) return;
if (st->prepared) croak("invalid attempt to re-prepare invalid statement");
/* TODO: This is where we check for any cached prepared statements */
snprintf(st->name, sizeof(st->name), "fupg%"UVuf, ++st->conn->prep_counter);
/* TODO: Pipeline these two commands, no need for two round-trips with the server */
PGresult *r = PQprepare(st->conn->conn, st->name, st->query, 0, NULL);
if (!r) fupg_conn_croak(st->conn , "prepare");
if (PQresultStatus(r) != PGRES_COMMAND_OK)
fupg_result_croak(r, "prepare", st->query);
PQclear(r);
st->prepared = 1;
r = PQdescribePrepared(st->conn->conn, st->name);
if (!r) fupg_conn_croak(st->conn , "prepare");
if (PQresultStatus(r) != PGRES_COMMAND_OK)
fupg_result_croak(r, "prepare", st->query);
st->describe = r;
}
static SV *fupg_st_params(pTHX_ fupg_st *st) {
fupg_st_prepare(st);
int i, nparams = PQnparams(st->describe);
AV *av = newAV_alloc_x(nparams);
for (i=0; i<nparams; i++) {
HV *hv = newHV();
hv_stores(hv, "oid", newSViv(PQparamtype(st->describe, i)));
av_push_simple(av, newRV_noinc((SV *)hv));
}
return sv_2mortal(newRV_noinc((SV *)av));
}
static SV *fupg_st_columns(pTHX_ fupg_st *st) {
fupg_st_prepare(st);
int i, ncols = PQnfields(st->describe);
AV *av = newAV_alloc_x(ncols);
for (i=0; i<ncols; i++) {
HV *hv = newHV();
const char *name = PQfname(st->describe, i);
hv_stores(hv, "name", newSVpvn_utf8(name, strlen(name), 1));
hv_stores(hv, "oid", newSViv(PQftype(st->describe, i)));
int tmod = PQfmod(st->describe, i);
if (tmod >= 0) hv_stores(hv, "typemod", newSViv(tmod));
av_push_simple(av, newRV_noinc((SV *)hv));
}
return sv_2mortal(newRV_noinc((SV *)av));
}
static void fupg_st_destroy(fupg_st *st) {
int i;
/* Ignore failure, this is just a best-effort attempt to free up resources on the backend */
if (st->prepared) PQclear(PQclosePrepared(st->conn->conn, st->name));
if (st->query) safefree(st->query);
for (i=0; i < st->bindn; i++) SvREFCNT_dec(st->bind[i]);
if (st->bind) safefree(st->bind);
if (st->describe) PQclear(st->describe);
SvREFCNT_dec(st->conn->self);
safefree(st);
}