pg: Support binary bind params

This commit is contained in:
Yorhel 2025-02-08 10:35:43 +01:00
parent 166744dd51
commit 30b457d2b8
6 changed files with 226 additions and 78 deletions

View file

@ -10,21 +10,22 @@ typedef struct {
UV cookie_counter;
UV cookie; /* currently active transaction object; 0 = none active */
int stflags;
fustr buf; /* Scratch space for query params */
} fupg_conn;
typedef struct fupg_txn fupg_txn;
struct fupg_txn {
SV *self;
struct fupg_txn *parent;
fupg_txn *parent;
fupg_conn *conn;
UV cookie; /* 0 means done */
int stflags;
char rollback_cmd[64];
};
typedef struct fupg_txn fupg_txn;
typedef struct {
/* Set in $conn->q() */
/* Set on creation */
SV *self; /* (unused, but whatever) */
fupg_conn *conn; /* has a refcnt on conn->self */
UV cookie;
@ -32,16 +33,18 @@ typedef struct {
SV **bind;
int nbind;
int stflags;
/* Set during prepare */
int prepared;
char name[32];
PGresult *describe;
/* Set during execute */
int nparam;
int nfields;
char **param;
const fupg_type **recv;
void **recvctx;
const char **param_values; /* Points into conn->buf or st->bind SVs, may be invalid after exec */
int *param_lengths;
int *param_formats;
fupg_recv *recv;
PGresult *result;
} fupg_st;
@ -147,8 +150,11 @@ static SV *fupg_connect(pTHX_ const char *str) {
croak_sv(sv);
}
fupg_conn *c = safecalloc(1, sizeof(fupg_conn));
fupg_conn *c = safemalloc(sizeof(fupg_conn));
c->conn = conn;
c->prep_counter = c->cookie_counter = c->cookie = 0;
c->stflags = 0;
fustr_init(&c->buf, NULL, SIZE_MAX);
return fu_selfobj(c, "FU::PG::conn");
}
@ -277,8 +283,8 @@ static SV *fupg_q(pTHX_ fupg_conn *c, int stflags, const char *query, I32 ax, I3
st->bind = safemalloc((argc-2) * sizeof(SV *));
I32 i;
for (i=2; i < argc; i++) {
st->bind[st->nbind] = newSV(0);
sv_setsv(st->bind[st->nbind], ST(i));
SvGETMAGIC(ST(i));
st->bind[st->nbind] = SvREFCNT_inc(ST(i));
st->nbind++;
}
}
@ -352,6 +358,68 @@ static void fupg_st_check_dupcols(pTHX_ PGresult *r) {
SvREFCNT_dec((SV *)hv);
}
static void fupg_params_setup(pTHX_ fupg_st *st) {
int i;
st->param_values = safecalloc(st->nbind, sizeof(*st->param_values));
if (st->stflags & FUPG_TEXT_PARAMS) {
for (i=0; i<st->nbind; i++)
st->param_values[i] = !SvOK(st->bind[i]) ? NULL : SvPVutf8_nolen(st->bind[i]);
return;
}
fustr *buf = &st->conn->buf;
buf->cur = fustr_start(buf);
st->param_lengths = safecalloc(st->nbind, sizeof(*st->param_lengths));
st->param_formats = safecalloc(st->nbind, sizeof(*st->param_formats));
size_t off = 0;
for (i=0; i<st->nbind; i++) {
if (!SvOK(st->bind[i])) {
st->param_values[i] = NULL;
continue;
}
fupg_send send;
send.oid = PQparamtype(st->describe, i);
const fupg_core_type *t = fupg_core_type_byoid(send.oid);
if (!t)
fu_confess("Unable to use type oid %u as bind parameter", send.oid);
send.name = t->name;
send.fn = t->send;
off = fustr_len(buf);
send.fn(aTHX_ &send, st->bind[i], buf);
st->param_lengths[i] = fustr_len(buf) - off;
st->param_formats[i] = 1;
st->param_values[i] = "";
/* Don't write param_values here, the buffer may be invalidated when writing the next param */
}
off = 0;
buf->cur = fustr_start(buf);
for (i=0; i<st->nbind; i++) {
if (st->param_values[i]) {
st->param_values[i] = buf->cur + off;
off += st->param_lengths[i];
}
}
}
static void fupg_results_setup(pTHX_ fupg_st *st) {
int i;
st->recv = safecalloc(st->nfields, sizeof(*st->recv));
if (st->stflags & FUPG_TEXT_RESULTS) {
for (i=0; i<st->nfields; i++)
st->recv[i].fn = fupg_recv_textfmt;
return;
}
for (i=0; i<st->nfields; i++) {
fupg_recv *r = st->recv + i;
r->oid = PQftype(st->result, i);
const fupg_core_type *t = fupg_core_type_byoid(r->oid);
if (!t) fu_confess("Unable to receive query results of type oid %u", r->oid);
r->name = t->name;
r->fn = t->recv;
}
}
static void fupg_st_execute(pTHX_ fupg_st *st) {
/* Disallow fetching the results more than once. I don't see a reason why
* someone would need that and disallowing it leaves room for fetching the
@ -359,14 +427,10 @@ static void fupg_st_execute(pTHX_ fupg_st *st) {
if (st->result) fu_confess("Invalid attempt to execute statement multiple times");
/* TODO: prepare can be skipped when prepared statement caching is disabled and (text-format queries or no bind params) */
/* TODO: support binary format params */
fupg_st_prepare(aTHX_ st);
st->param = safemalloc(st->nbind * sizeof(char *));
int i;
for (i=0; i<st->nbind; i++) {
st->param[i] = SvPVutf8_nolen(st->bind[i]);
st->nparam++;
}
if (PQnparams(st->describe) != st->nbind)
fu_confess("Statement expects %d bind parameters but %d were given", PQnparams(st->describe), st->nbind);
fupg_params_setup(aTHX_ st);
/* I'm not super fond of this approach. Storing the full query results in a
* PGresult involves unnecessary parsing, memory allocation and copying.
@ -379,8 +443,12 @@ static void fupg_st_execute(pTHX_ fupg_st *st) {
* malloc()/free()'s. Performance-wise, it probably won't be much of an
* improvement */
PGresult *r = PQexecPrepared(st->conn->conn,
st->name, st->nparam, (const char * const*)st->param,
NULL, NULL, st->stflags & FUPG_TEXT_RESULTS ? 0 : 1);
st->name,
st->nbind,
(const char * const *)st->param_values,
st->param_lengths,
st->param_formats,
st->stflags & FUPG_TEXT_RESULTS ? 0 : 1);
if (!r) fupg_conn_croak(st->conn , "exec");
switch (PQresultStatus(r)) {
case PGRES_COMMAND_OK:
@ -390,22 +458,14 @@ static void fupg_st_execute(pTHX_ fupg_st *st) {
st->result = r;
st->nfields = PQnfields(r);
st->recv = safecalloc(st->nfields, sizeof(*st->recv));
st->recvctx = safecalloc(st->nfields, sizeof(*st->recvctx));
for (i=0; i<st->nfields; i++) {
st->recv[i] = fupg_type_lookup(st->stflags & FUPG_TEXT_RESULTS ? 0 : PQftype(r, i));
if (!st->recv[i])
fu_confess("Unable to receive query results of type %u", PQftype(r, i));
}
fupg_results_setup(aTHX_ st);
}
static SV *fupg_st_getval(pTHX_ fupg_st *st, int row, int col) {
PGresult *r = st->result;
if (PQgetisnull(r, row, col)) return newSV(0);
int len = PQgetlength(st->result, row, col);
const fupg_type *t = st->recv[col];
if (t->len && len != t->len) fu_confess("invalid length for type %s: %d\n", t->name, len);
return t->recv(aTHX_ PQgetvalue(r, row, col), len, st->recvctx[col]);
const fupg_recv *ctx = st->recv+col;
return ctx->fn(aTHX_ ctx, PQgetvalue(r, row, col), PQgetlength(r, row, col));
}
static SV *fupg_st_exec(pTHX_ fupg_st *st) {
@ -474,11 +534,10 @@ static void fupg_st_destroy(fupg_st *st) {
safefree(st->query);
for (i=0; i < st->nbind; i++) SvREFCNT_dec(st->bind[i]);
safefree(st->bind);
/* XXX: These point into bind SVs (for now):
* for (i=0; i < st->nparam; i++) safefree(st->param[i]); */
safefree(st->param);
safefree(st->param_values);
safefree(st->param_lengths);
safefree(st->param_formats);
safefree(st->recv);
safefree(st->recvctx); /* XXX: Needs type-specific free() for the individual pointers */
PQclear(st->describe);
PQclear(st->result);
SvREFCNT_dec(st->conn->self);