pg: Add support for record/composite types

This is simply magical. \o/

Vendored in khashl.h. I wouldn't have used it if this were the only
place where I'd need a custom hash table, but it should come in handy
for other tasks as well, especially when I get to implementing an LRU
for prepared statement caching.

(Can all be done with Perl HV's, but they're less efficient and more
cumbersome for these tasks)
This commit is contained in:
Yorhel 2025-02-11 15:56:35 +01:00
parent 33fe0d98a8
commit 87d99e412b
5 changed files with 642 additions and 4 deletions

View file

@ -3,6 +3,8 @@
#define FUPG_TEXT_RESULTS 4
#define FUPG_TEXT (FUPG_TEXT_PARAMS|FUPG_TEXT_RESULTS)
KHASHL_MAP_INIT(KH_LOCAL, fupg_records, fupg_records, Oid, fupg_record *, kh_hash_uint32, kh_eq_generic);
typedef struct {
SV *self;
PGconn *conn;
@ -12,6 +14,7 @@ typedef struct {
int stflags;
int ntypes;
fupg_type *types;
fupg_records *records;
fustr buf; /* Scratch space for query params */
} fupg_conn;
@ -135,6 +138,7 @@ static SV *fupg_connect(pTHX_ const char *str) {
c->stflags = 0;
c->ntypes = 0;
c->types = NULL;
c->records = fupg_records_init();
fustr_init(&c->buf, NULL, SIZE_MAX);
return fu_selfobj(c, "FU::Pg::conn");
}
@ -159,6 +163,9 @@ static void fupg_conn_destroy(fupg_conn *c) {
PQfinish(c->conn);
if (c->buf.sv) SvREFCNT_dec(c->buf.sv);
safefree(c->types);
khint_t k;
kh_foreach(c->records, k) safefree(kh_val(c->records, k));
fupg_records_destroy(c->records);
safefree(c);
}
@ -255,6 +262,7 @@ static void fupg_refresh_types(pTHX_ fupg_conn *c) {
const char *sql =
"SELECT oid, typname, typtype"
", CASE WHEN typtype = 'd' THEN typbasetype"
" WHEN typtype = 'c' THEN typrelid"
" WHEN typcategory = 'A' THEN typelem"
" ELSE 0 END"
" FROM pg_type"
@ -277,6 +285,9 @@ static void fupg_refresh_types(pTHX_ fupg_conn *c) {
if (typ == 'd') { /* domain */
t->send = fupg_send_domain;
t->recv = fupg_recv_domain;
} else if (typ == 'c') { /* composite type */
t->send = fupg_send_record;
t->recv = fupg_recv_record;
} else { /* array */
t->send = fupg_send_array;
t->recv = fupg_recv_array;
@ -286,7 +297,7 @@ static void fupg_refresh_types(pTHX_ fupg_conn *c) {
t->send = fupg_send_text;
t->recv = fupg_recv_text;
} else {
/* TODO: records, (multi)ranges, custom overrides, by-name lookup for dynamic-oid types */
/* TODO: (multi)ranges, custom overrides, by-name lookup for dynamic-oid types */
const fupg_type *builtin = fupg_builtin_byoid(t->oid);
if (builtin) {
t->send = builtin->send;
@ -308,6 +319,39 @@ static const fupg_type *fupg_lookup_type(pTHX_ fupg_conn *c, int *refresh_done,
return fupg_type_byoid(c->types, c->ntypes, oid);
}
static const fupg_record *fupg_lookup_record(pTHX_ fupg_conn *c, Oid oid) {
khint_t k = fupg_records_get(c->records, oid);
if (k != kh_end(c->records)) return kh_val(c->records, k);
const char *sql =
"SELECT atttypid, attname"
" FROM pg_attribute"
" WHERE NOT attisdropped AND attnum > 0 AND attrelid = $1"
" ORDER BY attnum";
char buf[4];
fu_tobeU(32, buf, oid);
const char *abuf = buf;
int len = 4;
int format = 1;
PGresult *r = PQexecParams(c->conn, sql, 1, NULL, &abuf, &len, &format, 1);
if (!r) fupg_conn_croak(c, "exec");
if (PQresultStatus(r) != PGRES_TUPLES_OK) fupg_result_croak(r, "exec", sql);
fupg_record *record = safemalloc(sizeof(*record) + PQntuples(r) * sizeof(*record->attrs));
record->nattrs = PQntuples(r);
int i;
for (i=0; i<record->nattrs; i++) {
record->attrs[i].oid = fu_frombeU(32, PQgetvalue(r, i, 0));
snprintf(record->attrs[i].name, sizeof(record->attrs->name), "%s", PQgetvalue(r, i, 1));
}
k = fupg_records_put(c->records, oid, &i);
kh_val(c->records, k) = record;
PQclear(r);
return record;
}
#define FUPGT_TEXT 1
#define FUPGT_SEND 2
#define FUPGT_RECV 4
@ -337,13 +381,26 @@ static void fupg_tio_setup(pTHX_ fupg_conn *conn, fupg_tio *tio, int flags, Oid
if (flags & FUPGT_SEND ? tio->send == fupg_send_array : tio->recv == fupg_recv_array) {
tio->arrayelem = safecalloc(1, sizeof(*tio->arrayelem));
fupg_tio_setup(aTHX_ conn, tio->arrayelem, flags, t->elemoid, refresh_done);
} else if (flags & FUPGT_SEND ? tio->send == fupg_send_record : tio->recv == fupg_recv_record) {
tio->record.info = fupg_lookup_record(conn, t->elemoid);
if (!tio->record.info) fu_confess("Unable to find attributes for record type '%s' (oid %u, relid %u)", t->name, t->oid, t->elemoid);
tio->record.tio = safecalloc(tio->record.info->nattrs, sizeof(*tio->record.tio));
int i;
for (i=0; i<tio->record.info->nattrs; i++)
fupg_tio_setup(aTHX_ conn, tio->record.tio+i, flags, tio->record.info->attrs[i].oid, refresh_done);
}
}
static void fupg_tio_free(fupg_tio *tio) {
if (!tio) return;
/* XXX: This assumes send/recv are the same types, at least for arrays & records */
if (tio->send == fupg_send_array) {
fupg_tio_free(tio->arrayelem);
safefree(tio->arrayelem);
} else if (tio->send == fupg_send_record) {
int i;
for (i=0; i<tio->record.info->nattrs; i++)
fupg_tio_free(tio->record.tio+i);
safefree(tio->record.tio);
}
}