diff --git a/FU.xs b/FU.xs index e54795a..13cdca2 100644 --- a/FU.xs +++ b/FU.xs @@ -11,6 +11,7 @@ #include "c/jsonfmt.c" #include "c/jsonparse.c" #include "c/libpq.h" +#include "c/pgtypes.c" #include "c/pgconn.c" @@ -139,6 +140,11 @@ void q(fupg_txn c, SV *sv, ...) MODULE = FU PACKAGE = FU::PG::st +void text_results(fupg_st *st, ...) + CODE: + st->text_results = items == 1 || SvTRUE(ST(1)); + XSRETURN(1); + void params(fupg_st *st) CODE: FUPG_ST_COOKIE; diff --git a/c/pgconn.c b/c/pgconn.c index c43a87f..cb725ed 100644 --- a/c/pgconn.c +++ b/c/pgconn.c @@ -149,13 +149,6 @@ invalid: fu_confess("invalid transaction object"); } -/* Read a Perl value from a PGresult. - * Currently assumes text format and just creates a PV. */ -static SV *fupg_val(pTHX_ const PGresult *r, int row, int col) { - if (PQgetisnull(r, row, col)) return newSV(0); - return newSVpvn_utf8(PQgetvalue(r, row, col), PQgetlength(r, row, col), 1); -} - typedef struct { /* Set in $conn->q() */ SV *self; @@ -163,14 +156,19 @@ typedef struct { UV cookie; char *query; SV **bind; - int bindn; + int nbind; + bool text_params; + bool text_results; /* Set during prepare */ int prepared; char name[32]; PGresult *describe; /* Set during execute */ - int paramn; + int nparam; + int nfields; char **param; + const fupg_type **recv; + void **recvctx; PGresult *result; } fupg_st; @@ -178,6 +176,7 @@ static SV *fupg_q(pTHX_ fupg_conn *c, const char *query, I32 ax, I32 argc) { fupg_st *st = safecalloc(1, sizeof(fupg_st)); st->conn = c; st->cookie = c->cookie; + st->text_params = st->text_results = true; /* TODO: default to false */ SvREFCNT_inc(c->self); st->query = savepv(query); @@ -185,9 +184,9 @@ static SV *fupg_q(pTHX_ fupg_conn *c, const char *query, I32 ax, I32 argc) { st->bind = safemalloc((argc-2) * sizeof(SV *)); I32 i; for (i=2; i < argc; i++) { - st->bind[st->bindn] = newSV(0); - sv_setsv(st->bind[st->bindn], ST(i)); - st->bindn++; + st->bind[st->nbind] = newSV(0); + sv_setsv(st->bind[st->nbind], ST(i)); + st->nbind++; } } @@ -267,12 +266,13 @@ static void fupg_st_execute(pTHX_ fupg_st *st) { if (st->result) fu_confess("Invalid attempt to execute statement multiple times"); /* TODO: prepare can be skipped when prepared statement caching is disabled and (text-format queries or no bind params) */ + /* TODO: support binary format params */ fupg_st_prepare(aTHX_ st); - st->param = safemalloc(st->bindn * sizeof(char *)); + st->param = safemalloc(st->nbind * sizeof(char *)); int i; - for (i=0; ibindn; i++) { + for (i=0; inbind; i++) { st->param[i] = SvPVutf8_nolen(st->bind[i]); - st->paramn++; + st->nparam++; } /* I'm not super fond of this approach. Storing the full query results in a @@ -285,7 +285,9 @@ static void fupg_st_execute(pTHX_ fupg_st *st) { * gather that just saves a bit of memory in exchange for more and smaller * malloc()/free()'s. Performance-wise, it probably won't be much of an * improvement */ - PGresult *r = PQexecPrepared(st->conn->conn, st->name, st->paramn, (const char * const*)st->param, NULL, NULL, 0); + PGresult *r = PQexecPrepared(st->conn->conn, + st->name, st->nparam, (const char * const*)st->param, + NULL, NULL, st->text_results ? 0 : 1); if (!r) fupg_conn_croak(st->conn , "exec"); switch (PQresultStatus(r)) { case PGRES_COMMAND_OK: @@ -293,6 +295,24 @@ static void fupg_st_execute(pTHX_ fupg_st *st) { default: fupg_result_croak(r, "exec", st->query); } st->result = r; + + st->nfields = PQnfields(r); + st->recv = safecalloc(st->nfields, sizeof(*st->recv)); + st->recvctx = safecalloc(st->nfields, sizeof(*st->recvctx)); + for (i=0; infields; i++) { + st->recv[i] = fupg_type_lookup(st->text_results ? 0 : PQftype(r, i)); + if (!st->recv[i]) + fu_confess("Unable to receive query results of type %u", PQftype(r, i)); + } +} + +static SV *fupg_st_getval(pTHX_ fupg_st *st, int row, int col) { + PGresult *r = st->result; + if (PQgetisnull(r, row, col)) return newSV(0); + int len = PQgetlength(st->result, row, col); + const fupg_type *t = st->recv[col]; + if (t->len && len != t->len) fu_confess("invalid length for type %s: %d\n", t->name, len); + return t->recv(aTHX_ PQgetvalue(r, row, col), len, st->recvctx[col]); } static SV *fupg_st_exec(pTHX_ fupg_st *st) { @@ -306,7 +326,7 @@ static SV *fupg_st_val(pTHX_ fupg_st *st) { if (PQnfields(st->describe) == 0) fu_confess("Invalid use of $st->val() on query returning no data"); fupg_st_execute(aTHX_ st); if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->val() on query returning more than one row"); - SV *sv = PQntuples(st->result) == 0 ? newSV(0) : fupg_val(aTHX_ st->result, 0, 0); + SV *sv = PQntuples(st->result) == 0 ? newSV(0) : fupg_st_getval(aTHX_ st, 0, 0); return sv_2mortal(sv); } @@ -316,24 +336,24 @@ static I32 fupg_st_rowl(pTHX_ fupg_st *st, I32 ax) { if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowl() on query returning zero rows"); if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowl() on query returning more than one row"); if (GIMME_V != G_LIST) { - ST(0) = sv_2mortal(newSViv(PQnfields(st->result))); + ST(0) = sv_2mortal(newSViv(st->nfields)); return 1; } - int i, nfields = PQnfields(st->result); (void)POPs; - EXTEND(SP, nfields); - for (i=0; iresult, 0, i)); - return nfields; + EXTEND(SP, st->nfields); + int i; + for (i=0; infields; i++) mPUSHs(fupg_st_getval(aTHX_ st, 0, i)); + return st->nfields; } static SV *fupg_st_rowa(pTHX_ fupg_st *st) { fupg_st_execute(aTHX_ st); if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowl() on query returning zero rows"); if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowl() on query returning more than one row"); - int i, nfields = PQnfields(st->result); - AV *av = newAV_alloc_x(nfields); + AV *av = newAV_alloc_x(st->nfields); SV *sv = sv_2mortal(newRV_noinc((SV *)av)); - for (i=0; iresult, 0, i)); + int i; + for (i=0; infields; i++) av_push_simple(av, fupg_st_getval(aTHX_ st, 0, i)); return sv; } @@ -343,12 +363,12 @@ static SV *fupg_st_rowh(pTHX_ fupg_st *st) { fupg_st_execute(aTHX_ st); if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowh() on query returning zero rows"); if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowh() on query returning more than one row"); - int i, nfields = PQnfields(st->result); HV *hv = newHV(); SV *sv = sv_2mortal(newRV_noinc((SV *)hv)); - for (i=0; infields; i++) { const char *key = PQfname(st->result, i); - hv_store(hv, key, -strlen(key), fupg_val(aTHX_ st->result, 0, i), 0); + hv_store(hv, key, -strlen(key), fupg_st_getval(aTHX_ st, 0, i), 0); } return sv; } @@ -358,11 +378,14 @@ static void fupg_st_destroy(fupg_st *st) { /* Ignore failure, this is just a best-effort attempt to free up resources on the backend */ if (st->prepared) PQclear(PQclosePrepared(st->conn->conn, st->name)); - if (st->query) safefree(st->query); - for (i=0; i < st->bindn; i++) SvREFCNT_dec(st->bind[i]); - if (st->bind) safefree(st->bind); - /* XXX: These point into bind SVs (for now): for (i=0; i < st->paramn; i++) safefree(st->param[i]); */ - if (st->param) safefree(st->param); + safefree(st->query); + for (i=0; i < st->nbind; i++) SvREFCNT_dec(st->bind[i]); + safefree(st->bind); + /* XXX: These point into bind SVs (for now): + * for (i=0; i < st->nparam; i++) safefree(st->param[i]); */ + safefree(st->param); + safefree(st->recv); + safefree(st->recvctx); /* XXX: Needs type-specific free() for the individual pointers */ PQclear(st->describe); PQclear(st->result); SvREFCNT_dec(st->conn->self); @@ -372,4 +395,5 @@ static void fupg_st_destroy(fupg_st *st) { /* TODO: $st->alla, allh, flat, kvv, kva, kvh */ /* TODO: Prepared statement caching */ -/* TODO: Binary format fetching & type handling */ +/* TODO: Binary format bind parameters */ +/* TODO: Custom type handling */ diff --git a/c/pgtypes.c b/c/pgtypes.c new file mode 100644 index 0000000..35eaf52 --- /dev/null +++ b/c/pgtypes.c @@ -0,0 +1,72 @@ +/* Send function, takes a Perl value and should write the binary encoded + * format into the given fustr. */ +typedef void (*fupg_send_fn)(pTHX_ SV *, fustr *, void *); + +/* Receive function, takes a binary string and should return a Perl value. + * libpq guarantees that the given buffer is aligned to MAXIMUM_ALIGNOF. */ +typedef SV *(*fupg_recv_fn)(pTHX_ const char *, int, void *); + +typedef struct { + Oid oid; + int len; + const char *name; + fupg_send_fn send; + fupg_recv_fn recv; +} fupg_type; + + + +#define RECVFN(name) static SV *fupg_recv_##name(pTHX_ const char *buf, int buflen __attribute__((unused)), void *data __attribute__((unused))) + +RECVFN(textfmt) { + return newSVpvn_utf8(buf, buflen, 1); +} + +RECVFN(bool) { + return *buf ? &PL_sv_yes : &PL_sv_no; +} + +RECVFN(int2) { + return newSViv((I16)__builtin_bswap16(*((U16 *)buf))); +} + +RECVFN(int4) { + return newSViv((I32)__builtin_bswap32(*((U32 *)buf))); +} + +RECVFN(int8) { + return newSViv((I64)__builtin_bswap64(*((U64 *)buf))); +} + +#undef RECVFN + + + +#define R(name) fupg_recv_##name + +/* Sorted by oid to support binary search. + * (XXX: hash lookup might be faster, but requires codegen) */ +static const fupg_type fupg_types[] = { + { 0, 0, NULL, NULL, R(textfmt) }, /* Invalid Oid, abused for text format */ + { 16, 1, "bool", NULL, R(bool) }, + { 20, 8, "int8", NULL, R(int8) }, + { 21, 2, "int2", NULL, R(int2) }, + { 23, 4, "int4", NULL, R(int4) }, +}; +/* TODO: A LOT MORE TYPES */ + +#undef R + +#define FUPG_TYPES (sizeof(fupg_types) / sizeof(fupg_type)) + + +static const fupg_type *fupg_type_lookup(Oid oid) { + int i, b = 0, e = FUPG_TYPES-1; + while (b <= e) { + i = b + (e - b)/2; + if (fupg_types[i].oid == oid) return fupg_types+i; + if (fupg_types[i].oid < oid) b = i+1; + else e = i-1; + } + return NULL; +} diff --git a/t/pgtypes.t b/t/pgtypes.t new file mode 100644 index 0000000..1a8a183 --- /dev/null +++ b/t/pgtypes.t @@ -0,0 +1,28 @@ +use v5.36; +use Test::More; +no warnings 'experimental::builtin'; +use builtin qw/true false is_bool created_as_number/; + +plan skip_all => $@ if !eval { require FU::PG; } && $@ =~ /Unable to load libpq/; +die $@ if $@; +plan skip_all => 'Please set FU_TEST_DB to a PostgreSQL connection string to run these tests' if !$ENV{FU_TEST_DB}; + +my $conn = FU::PG->connect($ENV{FU_TEST_DB}); +$conn->_debug_trace(0); + +sub v($type, $v, $sql=$v) { + $sql = "($sql)::$type"; + my $res = $conn->q("SELECT $sql")->text_results(0)->val; + ok is_bool($res), "recv bool $sql" if $type eq 'bool'; + ok created_as_number($res), "recv number $sql" if $type =~ /^int/; + is $res, $v, "recv value $sql"; +} + +v bool => true, 'true'; +v bool => false, 'false'; + +v int2 => $_ for (1, -1, -32768, 32767, 12345, -12345); +v int4 => $_ for (1, -1, -2147483648, 2147483647, 1234567890, -1234567890); +v int8 => $_ for (1, -1, -9223372036854775808, 9223372036854775807, 1234567890123456789, -1234567890123456789); + +done_testing;