pg: Some refactoring + more result fetching methods

This commit is contained in:
Yorhel 2025-02-10 15:48:08 +01:00
parent d95ff76d43
commit ccc2f1dbf0
4 changed files with 535 additions and 332 deletions

421
c/pgst.c Normal file
View file

@ -0,0 +1,421 @@
typedef struct {
/* Set on creation */
SV *self; /* (unused, but whatever) */
fupg_conn *conn; /* has a refcnt on conn->self */
UV cookie;
char *query;
SV **bind;
int nbind;
int stflags;
/* Set during prepare */
int prepared;
char name[32];
PGresult *describe;
/* Set during execute */
int nfields;
const char **param_values; /* Points into conn->buf or st->bind SVs, may be invalid after exec */
int *param_lengths;
int *param_formats;
fupg_tio send;
fupg_tio *recv;
PGresult *result;
} fupg_st;
static SV *fupg_exec(pTHX_ fupg_conn *c, const char *sql) {
PGresult *r = PQexec(c->conn, sql);
if (!r) fupg_conn_croak(c, "exec");
switch (PQresultStatus(r)) {
case PGRES_EMPTY_QUERY:
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK: break;
default: fupg_result_croak(r, "exec", sql);
}
SV *ret = fupg_exec_result(r);
PQclear(r);
return ret;
}
static SV *fupg_q(pTHX_ fupg_conn *c, int stflags, const char *query, I32 ax, I32 argc) {
fupg_st *st = safecalloc(1, sizeof(fupg_st));
st->conn = c;
st->cookie = c->cookie;
st->stflags = stflags;
SvREFCNT_inc(c->self);
st->query = savepv(query);
if (argc > 2) {
st->bind = safemalloc((argc-2) * sizeof(SV *));
I32 i;
for (i=2; i < argc; i++) {
SvGETMAGIC(ST(i));
st->bind[st->nbind] = SvREFCNT_inc(ST(i));
st->nbind++;
}
}
return fu_selfobj(st, "FU::PG::st");
}
static void fupg_st_destroy(fupg_st *st) {
int i;
/* Ignore failure, this is just a best-effort attempt to free up resources on the backend */
if (st->prepared) PQclear(PQclosePrepared(st->conn->conn, st->name));
safefree(st->query);
for (i=0; i < st->nbind; i++) SvREFCNT_dec(st->bind[i]);
safefree(st->bind);
safefree(st->param_values);
safefree(st->param_lengths);
safefree(st->param_formats);
if (st->recv) for (i=0; i<st->nfields; i++) fupg_tio_free(st->recv + i);
fupg_tio_free(&st->send);
safefree(st->recv);
PQclear(st->describe);
PQclear(st->result);
SvREFCNT_dec(st->conn->self);
safefree(st);
}
static void fupg_st_prepare(pTHX_ fupg_st *st) {
if (st->describe) return;
if (st->prepared) fu_confess("invalid attempt to re-prepare invalid statement");
/* TODO: This is where we check for any cached prepared statements */
snprintf(st->name, sizeof(st->name), "fupg%"UVuf, ++st->conn->prep_counter);
/* Send prepare + describe in a pipeline to avoid a double round-trip with the server */
PQenterPipelineMode(st->conn->conn);
PQsendPrepare(st->conn->conn, st->name, st->query, 0, NULL);
PQsendDescribePrepared(st->conn->conn, st->name);
PQpipelineSync(st->conn->conn);
PGresult *prep = PQgetResult(st->conn->conn); PQgetResult(st->conn->conn); /* NULL */
PGresult *desc = PQgetResult(st->conn->conn); PQgetResult(st->conn->conn); /* NULL */
PGresult *sync = PQgetResult(st->conn->conn);
PQexitPipelineMode(st->conn->conn);
if (!prep) {
PQclear(desc); PQclear(sync);
fupg_conn_croak(st->conn , "prepare");
}
if (PQresultStatus(prep) != PGRES_COMMAND_OK) {
PQclear(desc); PQclear(sync);
fupg_result_croak(prep, "prepare", st->query);
}
PQclear(prep);
st->prepared = 1;
if (!desc) {
PQclear(sync);
fupg_conn_croak(st->conn , "prepare");
}
if (PQresultStatus(desc) != PGRES_COMMAND_OK) {
PQclear(sync);
fupg_result_croak(desc, "prepare", st->query);
}
st->describe = desc;
if (!sync) fupg_conn_croak(st->conn , "prepare");
if (PQresultStatus(sync) != PGRES_PIPELINE_SYNC)
fupg_result_croak(sync, "prepare", st->query);
PQclear(sync);
}
static SV *fupg_st_params(pTHX_ fupg_st *st) {
fupg_st_prepare(aTHX_ st);
int i, nparams = PQnparams(st->describe);
AV *av = newAV_alloc_x(nparams);
for (i=0; i<nparams; i++) {
HV *hv = newHV();
hv_stores(hv, "oid", newSViv(PQparamtype(st->describe, i)));
av_push_simple(av, newRV_noinc((SV *)hv));
}
return sv_2mortal(newRV_noinc((SV *)av));
}
static SV *fupg_st_columns(pTHX_ fupg_st *st) {
fupg_st_prepare(aTHX_ st);
int i, nfields = PQnfields(st->describe);
AV *av = newAV_alloc_x(nfields);
for (i=0; i<nfields; i++) {
HV *hv = newHV();
const char *name = PQfname(st->describe, i);
hv_stores(hv, "name", newSVpvn_utf8(name, strlen(name), 1));
hv_stores(hv, "oid", newSViv(PQftype(st->describe, i)));
int tmod = PQfmod(st->describe, i);
if (tmod >= 0) hv_stores(hv, "typemod", newSViv(tmod));
av_push_simple(av, newRV_noinc((SV *)hv));
}
return sv_2mortal(newRV_noinc((SV *)av));
}
static void fupg_params_setup(pTHX_ fupg_st *st, int *refresh_done) {
int i;
st->param_values = safecalloc(st->nbind, sizeof(*st->param_values));
if (st->stflags & FUPG_TEXT_PARAMS) {
for (i=0; i<st->nbind; i++)
st->param_values[i] = !SvOK(st->bind[i]) ? NULL : SvPVutf8_nolen(st->bind[i]);
return;
}
fustr *buf = &st->conn->buf;
buf->cur = fustr_start(buf);
st->param_lengths = safecalloc(st->nbind, sizeof(*st->param_lengths));
st->param_formats = safecalloc(st->nbind, sizeof(*st->param_formats));
size_t off = 0;
for (i=0; i<st->nbind; i++) {
if (!SvOK(st->bind[i])) {
st->param_values[i] = NULL;
continue;
}
fupg_tio_setup(aTHX_ st->conn, &st->send,
FUPGT_SEND | (st->stflags & FUPG_TEXT_PARAMS ? FUPGT_TEXT : 0),
PQparamtype(st->describe, i), refresh_done);
off = fustr_len(buf);
st->send.send(aTHX_ &st->send, st->bind[i], buf);
fupg_tio_free(&st->send);
memset(&st->send, 0, sizeof(st->send));
st->param_lengths[i] = fustr_len(buf) - off;
st->param_formats[i] = 1;
st->param_values[i] = "";
/* Don't write param_values here, the buffer may be invalidated when writing the next param */
}
off = 0;
buf->cur = fustr_start(buf);
for (i=0; i<st->nbind; i++) {
if (st->param_values[i]) {
st->param_values[i] = buf->cur + off;
off += st->param_lengths[i];
}
}
}
static void fupg_st_execute(pTHX_ fupg_st *st) {
/* Disallow fetching the results more than once. I don't see a reason why
* someone would need that and disallowing it leaves room for fetching the
* results in a streaming fashion without breaking API compat. */
if (st->result) fu_confess("Invalid attempt to execute statement multiple times");
/* TODO: prepare can be skipped when prepared statement caching is disabled and (text-format queries or no bind params) */
fupg_st_prepare(aTHX_ st);
if (PQnparams(st->describe) != st->nbind)
fu_confess("Statement expects %d bind parameters but %d were given", PQnparams(st->describe), st->nbind);
int refresh_done = 0;
fupg_params_setup(aTHX_ st, &refresh_done);
/* I'm not super fond of this approach. Storing the full query results in a
* PGresult involves unnecessary parsing, memory allocation and copying.
* The wire protocol is sufficiently simple that I could parse the query
* results directly from the network buffers without much additional code,
* and that would be much more efficient. Alas, libpq doesn't let me do
* that.
* There is the option of fetching results in chunked mode, but from what I
* gather that just saves a bit of memory in exchange for more and smaller
* malloc()/free()'s. Performance-wise, it probably won't be much of an
* improvement */
PGresult *r = PQexecPrepared(st->conn->conn,
st->name,
st->nbind,
(const char * const *)st->param_values,
st->param_lengths,
st->param_formats,
st->stflags & FUPG_TEXT_RESULTS ? 0 : 1);
if (!r) fupg_conn_croak(st->conn , "exec");
switch (PQresultStatus(r)) {
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK: break;
default: fupg_result_croak(r, "exec", st->query);
}
st->result = r;
st->nfields = PQnfields(r);
st->recv = safecalloc(st->nfields, sizeof(*st->recv));
int i;
for (i=0; i<st->nfields; i++)
fupg_tio_setup(aTHX_ st->conn, st->recv + i,
FUPGT_RECV | (st->stflags & FUPG_TEXT_RESULTS ? FUPGT_TEXT : 0),
PQftype(st->result, i), &refresh_done);
}
static SV *fupg_st_getval(pTHX_ fupg_st *st, int row, int col) {
PGresult *r = st->result;
if (PQgetisnull(r, row, col)) return newSV(0);
const fupg_tio *ctx = st->recv+col;
return ctx->recv(aTHX_ ctx, PQgetvalue(r, row, col), PQgetlength(r, row, col));
}
static void fupg_st_check_dupcols(pTHX_ fupg_st *st, int start) {
PGresult *r = st->result;
HV *hv = newHV();
sv_2mortal((SV *)hv);
int i, nfields = PQnfields(r);
for (i=start; i<nfields; i++) {
const char *key = PQfname(r, i);
int len = -strlen(key);
if (hv_exists(hv, key, len)) {
SvREFCNT_dec((SV *)hv);
fu_confess("Query returns multiple columns with the same name ('%s')", key);
}
hv_store(hv, key, len, &PL_sv_yes, 0);
}
}
/* Result fetching */
static SV *fupg_st_exec(pTHX_ fupg_st *st) {
fupg_st_execute(aTHX_ st);
return fupg_exec_result(st->result);
}
static SV *fupg_st_val(pTHX_ fupg_st *st) {
fupg_st_execute(aTHX_ st);
if (st->nfields > 1) fu_confess("Invalid use of $st->val() on query returning more than one column");
if (st->nfields == 0) fu_confess("Invalid use of $st->val() on query returning no data");
if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->val() on query returning more than one row");
SV *sv = PQntuples(st->result) == 0 ? newSV(0) : fupg_st_getval(aTHX_ st, 0, 0);
return sv_2mortal(sv);
}
static I32 fupg_st_rowl(pTHX_ fupg_st *st, I32 ax) {
dSP;
fupg_st_execute(aTHX_ st);
if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowl() on query returning zero rows");
if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowl() on query returning more than one row");
if (GIMME_V != G_LIST) {
ST(0) = sv_2mortal(newSViv(st->nfields));
return 1;
}
(void)POPs;
EXTEND(SP, st->nfields);
int i;
for (i=0; i<st->nfields; i++) mPUSHs(fupg_st_getval(aTHX_ st, 0, i));
return st->nfields;
}
static SV *fupg_st_rowa(pTHX_ fupg_st *st) {
fupg_st_execute(aTHX_ st);
if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowl() on query returning zero rows");
if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowl() on query returning more than one row");
AV *av = newAV_alloc_x(st->nfields);
SV *sv = sv_2mortal(newRV_noinc((SV *)av));
int i;
for (i=0; i<st->nfields; i++) av_push_simple(av, fupg_st_getval(aTHX_ st, 0, i));
return sv;
}
static SV *fupg_st_rowh(pTHX_ fupg_st *st) {
fupg_st_execute(aTHX_ st);
fupg_st_check_dupcols(aTHX_ st, 0);
if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowh() on query returning zero rows");
if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowh() on query returning more than one row");
HV *hv = newHV();
SV *sv = sv_2mortal(newRV_noinc((SV *)hv));
int i;
for (i=0; i<st->nfields; i++) {
const char *key = PQfname(st->result, i);
hv_store(hv, key, -strlen(key), fupg_st_getval(aTHX_ st, 0, i), 0);
}
return sv;
}
static SV *fupg_st_alla(pTHX_ fupg_st *st) {
fupg_st_execute(aTHX_ st);
int i, j, nrows = PQntuples(st->result);
AV *av = newAV_alloc_x(nrows);
SV *sv = sv_2mortal(newRV_noinc((SV *)av));
for (i=0; i<nrows; i++) {
AV *row = newAV_alloc_x(st->nfields);
av_push_simple(av, newRV_noinc((SV *)row));
for (j=0; j<st->nfields; j++)
av_push_simple(row, fupg_st_getval(aTHX_ st, i, j));
}
return sv;
}
static SV *fupg_st_allh(pTHX_ fupg_st *st) {
fupg_st_execute(aTHX_ st);
fupg_st_check_dupcols(aTHX_ st, 0);
int i, j, nrows = PQntuples(st->result);
AV *av = newAV_alloc_x(nrows);
SV *sv = sv_2mortal(newRV_noinc((SV *)av));
for (i=0; i<nrows; i++) {
HV *row = newHV();
av_push_simple(av, newRV_noinc((SV *)row));
for (j=0; j<st->nfields; j++) {
const char *key = PQfname(st->result, j);
hv_store(row, key, -strlen(key), fupg_st_getval(aTHX_ st, i, j), 0);
}
}
return sv;
}
static SV *fupg_st_flat(pTHX_ fupg_st *st) {
fupg_st_execute(aTHX_ st);
int i, j, nrows = PQntuples(st->result);
AV *av = newAV_alloc_x(nrows);
SV *sv = sv_2mortal(newRV_noinc((SV *)av));
for (i=0; i<nrows; i++) {
for (j=0; j<st->nfields; j++)
av_push_simple(av, fupg_st_getval(aTHX_ st, i, j));
}
return sv;
}
static SV *fupg_st_kvv(pTHX_ fupg_st *st) {
fupg_st_execute(aTHX_ st);
if (st->nfields > 2) fu_confess("Invalid use of $st->kvv() on query returning more than two columns");
if (st->nfields == 0) fu_confess("Invalid use of $st->kvv() on query returning no data");
int i, nrows = PQntuples(st->result);
HV *hv = newHV();
SV *sv = sv_2mortal(newRV_noinc((SV *)hv));
for (i=0; i<nrows; i++) {
SV *key = fupg_st_getval(aTHX_ st, i, 0);
if (hv_exists_ent(hv, key, 0)) fu_confess("Key '%s' is duplicated in $st->kvv() query results", SvPV_nolen(key));
hv_store_ent(hv, key, st->nfields == 1 ? &PL_sv_yes : fupg_st_getval(aTHX_ st, i, 1), 0);
}
return sv;
}
static SV *fupg_st_kva(pTHX_ fupg_st *st) {
fupg_st_execute(aTHX_ st);
if (st->nfields == 0) fu_confess("Invalid use of $st->kva() on query returning no data");
int i, j, nrows = PQntuples(st->result);
HV *hv = newHV();
SV *sv = sv_2mortal(newRV_noinc((SV *)hv));
for (i=0; i<nrows; i++) {
SV *key = fupg_st_getval(aTHX_ st, i, 0);
if (hv_exists_ent(hv, key, 0)) fu_confess("Key '%s' is duplicated in $st->kva() query results", SvPV_nolen(key));
AV *row = newAV_alloc_x(st->nfields);
hv_store_ent(hv, key, newRV_noinc((SV *)row), 0);
for (j=1; j<st->nfields; j++)
av_push_simple(row, fupg_st_getval(aTHX_ st, i, j));
}
return sv;
}
static SV *fupg_st_kvh(pTHX_ fupg_st *st) {
fupg_st_execute(aTHX_ st);
fupg_st_check_dupcols(aTHX_ st, 1);
if (st->nfields == 0) fu_confess("Invalid use of $st->kvh() on query returning no data");
int i, j, nrows = PQntuples(st->result);
HV *hv = newHV();
SV *sv = sv_2mortal(newRV_noinc((SV *)hv));
for (i=0; i<nrows; i++) {
SV *key = fupg_st_getval(aTHX_ st, i, 0);
if (hv_exists_ent(hv, key, 0)) fu_confess("Key '%s' is duplicated in $st->kvh() query results", SvPV_nolen(key));
HV *row = newHV();
hv_store_ent(hv, key, newRV_noinc((SV *)row), 0);
for (j=1; j<st->nfields; j++) {
const char *key = PQfname(st->result, j);
hv_store(row, key, -strlen(key), fupg_st_getval(aTHX_ st, i, j), 0);
}
}
return sv;
}