typedef struct { SV *self; PGconn *conn; UV prep_counter; UV cookie; /* currently active transaction object; 0 = none active */ } fupg_conn; static SV *fupg_conn_errsv(PGconn *conn, const char *action) { dTHX; HV *hv = newHV(); hv_stores(hv, "action", newSVpv(action, 0)); hv_stores(hv, "severity", newSVpvs("FATAL")); /* Connection-related errors are always fatal */ hv_stores(hv, "message", newSVpv(PQerrorMessage(conn), 0)); return fu_croak_hv(hv, "FU::PG::error", "FATAL: %s", PQerrorMessage(conn)); } static void fupg_conn_croak(fupg_conn *c, const char *action) { dTHX; croak_sv(fupg_conn_errsv(c->conn, action)); } /* Takes ownership of the PGresult and croaks. */ static void fupg_result_croak(PGresult *r, const char *action, const char *query) { dTHX; HV *hv = newHV(); hv_stores(hv, "action", newSVpv(action, 0)); char *s = PQresultErrorField(r, PG_DIAG_SEVERITY_NONLOCALIZED); hv_stores(hv, "severity", newSVpv(s ? s : "FATAL", 0)); if (query) hv_stores(hv, "query", newSVpv(query, 0)); /* If the PGresult is not an error, assume it's an unexpected resultStatus */ s = PQresultErrorField(r, PG_DIAG_MESSAGE_PRIMARY); hv_stores(hv, "message", s ? newSVpv(s, 0) : newSVpvf("unexpected status code '%s'", PQresStatus(PQresultStatus(r)))); /* I like the verbose error messages. Doesn't include anything that's not * also fetched below, but saves me from having to do the formatting * manually. */ char *verbose = NULL; if (s) { verbose = PQresultVerboseErrorMessage(r, PQERRORS_VERBOSE, PQSHOW_CONTEXT_ERRORS); if (s) { hv_stores(hv, "verbose_message", newSVpv(verbose, 0)); PQfreemem(verbose); } } if ((s = PQresultErrorField(r, PG_DIAG_MESSAGE_DETAIL))) hv_stores(hv, "detail", newSVpv(s, 0)); if ((s = PQresultErrorField(r, PG_DIAG_MESSAGE_HINT))) hv_stores(hv, "hint", newSVpv(s, 0)); if ((s = PQresultErrorField(r, PG_DIAG_STATEMENT_POSITION))) hv_stores(hv, "statement_position", newSVpv(s, 0)); if ((s = PQresultErrorField(r, PG_DIAG_INTERNAL_POSITION))) hv_stores(hv, "internal_position", newSVpv(s, 0)); if ((s = PQresultErrorField(r, PG_DIAG_INTERNAL_QUERY))) hv_stores(hv, "internal_query", newSVpv(s, 0)); if ((s = PQresultErrorField(r, PG_DIAG_CONTEXT))) hv_stores(hv, "context", newSVpv(s, 0)); if ((s = PQresultErrorField(r, PG_DIAG_SCHEMA_NAME))) hv_stores(hv, "schema_name", newSVpv(s, 0)); if ((s = PQresultErrorField(r, PG_DIAG_TABLE_NAME))) hv_stores(hv, "table_name", newSVpv(s, 0)); if ((s = PQresultErrorField(r, PG_DIAG_COLUMN_NAME))) hv_stores(hv, "column_name", newSVpv(s, 0)); if ((s = PQresultErrorField(r, PG_DIAG_DATATYPE_NAME))) hv_stores(hv, "datatype_name", newSVpv(s, 0)); if ((s = PQresultErrorField(r, PG_DIAG_CONSTRAINT_NAME))) hv_stores(hv, "constraint_name", newSVpv(s, 0)); if ((s = PQresultErrorField(r, PG_DIAG_SOURCE_FILE))) hv_stores(hv, "source_file", newSVpv(s, 0)); if ((s = PQresultErrorField(r, PG_DIAG_SOURCE_LINE))) hv_stores(hv, "source_line", newSVpv(s, 0)); if ((s = PQresultErrorField(r, PG_DIAG_SOURCE_FUNCTION))) hv_stores(hv, "source_function", newSVpv(s, 0)); PQclear(r); croak_sv(verbose ? fu_croak_hv(hv, "FU::PG::error", "%s", SvPV_nolen(*hv_fetchs(hv, "verbose_message", 0))) : fu_croak_hv(hv, "FU::PG::error", "%s: %s", SvPV_nolen(*hv_fetchs(hv, "severity", 0)), SvPV_nolen(*hv_fetchs(hv, "message", 0)) ) ); } static SV *fupg_connect(pTHX_ const char *str) { PGconn *conn = PQconnectdb(str); if (PQstatus(conn) != CONNECTION_OK) { SV *sv = fupg_conn_errsv(conn, "connect"); PQfinish(conn); croak_sv(sv); } fupg_conn *c = safecalloc(1, sizeof(fupg_conn)); c->conn = conn; return fupg_selfobj(c, "FU::PG::conn"); } static const char *fupg_status(fupg_conn *c) { if (PQstatus(c->conn) == CONNECTION_BAD) return "bad"; switch (PQtransactionStatus(c->conn)) { case PQTRANS_IDLE: return c->cookie ? "txn_done" : "idle"; case PQTRANS_ACTIVE: return "active"; /* can't happen, we don't do async */ case PQTRANS_INTRANS: return "txn_idle"; case PQTRANS_INERROR: return "txn_error"; default: return "unknown"; } } static void fupg_disconnect(fupg_conn *c) { PQfinish(c->conn); c->conn = NULL; } static void fupg_destroy(fupg_conn *c) { PQfinish(c->conn); safefree(c); } static SV *fupg_exec_result(pTHX_ PGresult *r) { SV *ret = &PL_sv_undef; char *tup = PQcmdTuples(r); if (tup && *tup) { ret = sv_2mortal(newSVpv(tup, 0)); SvIV(ret); SvIOK_only(ret); } return ret; } static SV *fupg_exec(pTHX_ fupg_conn *c, const char *sql) { PGresult *r = PQexec(c->conn, sql); if (!r) fupg_conn_croak(c, "exec"); switch (PQresultStatus(r)) { case PGRES_EMPTY_QUERY: case PGRES_COMMAND_OK: case PGRES_TUPLES_OK: break; default: fupg_result_croak(r, "exec", sql); } SV *ret = fupg_exec_result(r); PQclear(r); return ret; } /* Validate a FU::PG::txn object and extract the connection */ static fupg_conn *fupg_get_transaction(pTHX_ SV *sv) { if (!sv_derived_from(sv, "FU::PG::txn")) goto invalid; sv = SvRV(sv); if (SvTYPE(sv) != SVt_PVAV) goto invalid; AV *av = (AV *)sv; SV **v = av_fetch(av, 0, 0); if (!v || !*v) goto invalid; fupg_conn *c = (fupg_conn *)SvIVX(SvRV(*v)); v = av_fetch(av, 1, 0); if (!v || !*v) goto invalid; if (!SvOK(*v)) fu_confess("Invalid attempt to run a query on a transaction that has already finished"); if (c->cookie != SvUV(*v)) fu_confess("Invalid cross-transaction operation"); return c; invalid: fu_confess("invalid transaction object"); } /* Read a Perl value from a PGresult. * Currently assumes text format and just creates a PV. */ static SV *fupg_val(pTHX_ const PGresult *r, int row, int col) { if (PQgetisnull(r, row, col)) return newSV(0); return newSVpvn_utf8(PQgetvalue(r, row, col), PQgetlength(r, row, col), 1); } typedef struct { /* Set in $conn->q() */ SV *self; fupg_conn *conn; /* has a refcnt on conn->self */ UV cookie; char *query; SV **bind; int bindn; /* Set during prepare */ int prepared; char name[32]; PGresult *describe; /* Set during execute */ int paramn; char **param; PGresult *result; } fupg_st; static SV *fupg_q(pTHX_ fupg_conn *c, const char *query, I32 ax, I32 argc) { fupg_st *st = safecalloc(1, sizeof(fupg_st)); st->conn = c; st->cookie = c->cookie; SvREFCNT_inc(c->self); st->query = savepv(query); if (argc > 2) { st->bind = safemalloc((argc-2) * sizeof(SV *)); I32 i; for (i=2; i < argc; i++) { st->bind[st->bindn] = newSV(0); sv_setsv(st->bind[st->bindn], ST(i)); st->bindn++; } } return fupg_selfobj(st, "FU::PG::st"); } static void fupg_st_prepare(pTHX_ fupg_st *st) { if (st->describe) return; if (st->prepared) fu_confess("invalid attempt to re-prepare invalid statement"); /* TODO: This is where we check for any cached prepared statements */ snprintf(st->name, sizeof(st->name), "fupg%"UVuf, ++st->conn->prep_counter); /* TODO: Pipeline these two commands, no need for two round-trips with the server */ PGresult *r = PQprepare(st->conn->conn, st->name, st->query, 0, NULL); if (!r) fupg_conn_croak(st->conn , "prepare"); if (PQresultStatus(r) != PGRES_COMMAND_OK) fupg_result_croak(r, "prepare", st->query); PQclear(r); st->prepared = 1; r = PQdescribePrepared(st->conn->conn, st->name); if (!r) fupg_conn_croak(st->conn , "prepare"); if (PQresultStatus(r) != PGRES_COMMAND_OK) fupg_result_croak(r, "prepare", st->query); st->describe = r; } static SV *fupg_st_params(pTHX_ fupg_st *st) { fupg_st_prepare(aTHX_ st); int i, nparams = PQnparams(st->describe); AV *av = newAV_alloc_x(nparams); for (i=0; idescribe, i))); av_push_simple(av, newRV_noinc((SV *)hv)); } return sv_2mortal(newRV_noinc((SV *)av)); } static SV *fupg_st_columns(pTHX_ fupg_st *st) { fupg_st_prepare(aTHX_ st); int i, ncols = PQnfields(st->describe); AV *av = newAV_alloc_x(ncols); for (i=0; idescribe, i); hv_stores(hv, "name", newSVpvn_utf8(name, strlen(name), 1)); hv_stores(hv, "oid", newSViv(PQftype(st->describe, i))); int tmod = PQfmod(st->describe, i); if (tmod >= 0) hv_stores(hv, "typemod", newSViv(tmod)); av_push_simple(av, newRV_noinc((SV *)hv)); } return sv_2mortal(newRV_noinc((SV *)av)); } static void fupg_st_check_dupcols(pTHX_ PGresult *r) { HV *hv = newHV(); int i, nfields = PQnfields(r); for (i=0; iresult) fu_confess("Invalid attempt to execute statement multiple times"); /* TODO: prepare can be skipped when prepared statement caching is disabled and (text-format queries or no bind params) */ fupg_st_prepare(aTHX_ st); st->param = safemalloc(st->bindn * sizeof(char *)); int i; for (i=0; ibindn; i++) { st->param[i] = SvPVutf8_nolen(st->bind[i]); st->paramn++; } /* I'm not super fond of this approach. Storing the full query results in a * PGresult involves unnecessary parsing, memory allocation and copying. * The wire protocol is sufficiently simple that I could parse the query * results directly from the network buffers without much additional code, * and that would be much more efficient. Alas, libpq doesn't let me do * that. * There is the option of fetching results in chunked mode, but from what I * gather that just saves a bit of memory in exchange for more and smaller * malloc()/free()'s. Performance-wise, it probably won't be much of an * improvement */ PGresult *r = PQexecPrepared(st->conn->conn, st->name, st->paramn, (const char * const*)st->param, NULL, NULL, 0); if (!r) fupg_conn_croak(st->conn , "exec"); switch (PQresultStatus(r)) { case PGRES_COMMAND_OK: case PGRES_TUPLES_OK: break; default: fupg_result_croak(r, "exec", st->query); } st->result = r; } static SV *fupg_st_exec(pTHX_ fupg_st *st) { fupg_st_execute(aTHX_ st); return fupg_exec_result(st->result); } static SV *fupg_st_val(pTHX_ fupg_st *st) { fupg_st_prepare(aTHX_ st); if (PQnfields(st->describe) > 1) fu_confess("Invalid use of $st->val() on query returning more than one column"); if (PQnfields(st->describe) == 0) fu_confess("Invalid use of $st->val() on query returning no data"); fupg_st_execute(aTHX_ st); if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->val() on query returning more than one row"); SV *sv = PQntuples(st->result) == 0 ? newSV(0) : fupg_val(aTHX_ st->result, 0, 0); return sv_2mortal(sv); } static I32 fupg_st_rowl(pTHX_ fupg_st *st, I32 ax) { dSP; fupg_st_execute(aTHX_ st); if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowl() on query returning zero rows"); if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowl() on query returning more than one row"); if (GIMME_V != G_LIST) { ST(0) = sv_2mortal(newSViv(PQnfields(st->result))); return 1; } int i, nfields = PQnfields(st->result); (void)POPs; EXTEND(SP, nfields); for (i=0; iresult, 0, i)); return nfields; } static SV *fupg_st_rowa(pTHX_ fupg_st *st) { fupg_st_execute(aTHX_ st); if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowl() on query returning zero rows"); if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowl() on query returning more than one row"); int i, nfields = PQnfields(st->result); AV *av = newAV_alloc_x(nfields); SV *sv = sv_2mortal(newRV_noinc((SV *)av)); for (i=0; iresult, 0, i)); return sv; } static SV *fupg_st_rowh(pTHX_ fupg_st *st) { fupg_st_prepare(aTHX_ st); fupg_st_check_dupcols(aTHX_ st->describe); fupg_st_execute(aTHX_ st); if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowh() on query returning zero rows"); if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowh() on query returning more than one row"); int i, nfields = PQnfields(st->result); HV *hv = newHV(); SV *sv = sv_2mortal(newRV_noinc((SV *)hv)); for (i=0; iresult, i); hv_store(hv, key, -strlen(key), fupg_val(aTHX_ st->result, 0, i), 0); } return sv; } static void fupg_st_destroy(fupg_st *st) { int i; /* Ignore failure, this is just a best-effort attempt to free up resources on the backend */ if (st->prepared) PQclear(PQclosePrepared(st->conn->conn, st->name)); if (st->query) safefree(st->query); for (i=0; i < st->bindn; i++) SvREFCNT_dec(st->bind[i]); if (st->bind) safefree(st->bind); /* XXX: These point into bind SVs (for now): for (i=0; i < st->paramn; i++) safefree(st->param[i]); */ if (st->param) safefree(st->param); PQclear(st->describe); PQclear(st->result); SvREFCNT_dec(st->conn->self); safefree(st); } /* TODO: $st->alla, allh, flat, kvv, kva, kvh */ /* TODO: Prepared statement caching */ /* TODO: Binary format fetching & type handling */