pg: Initial support for receiving binary results
Just the initial framework stuff and a few types to test with.
This commit is contained in:
parent
7c8473533d
commit
8f94dd0921
4 changed files with 164 additions and 34 deletions
6
FU.xs
6
FU.xs
|
|
@ -11,6 +11,7 @@
|
|||
#include "c/jsonfmt.c"
|
||||
#include "c/jsonparse.c"
|
||||
#include "c/libpq.h"
|
||||
#include "c/pgtypes.c"
|
||||
#include "c/pgconn.c"
|
||||
|
||||
|
||||
|
|
@ -139,6 +140,11 @@ void q(fupg_txn c, SV *sv, ...)
|
|||
|
||||
MODULE = FU PACKAGE = FU::PG::st
|
||||
|
||||
void text_results(fupg_st *st, ...)
|
||||
CODE:
|
||||
st->text_results = items == 1 || SvTRUE(ST(1));
|
||||
XSRETURN(1);
|
||||
|
||||
void params(fupg_st *st)
|
||||
CODE:
|
||||
FUPG_ST_COOKIE;
|
||||
|
|
|
|||
92
c/pgconn.c
92
c/pgconn.c
|
|
@ -149,13 +149,6 @@ invalid:
|
|||
fu_confess("invalid transaction object");
|
||||
}
|
||||
|
||||
/* Read a Perl value from a PGresult.
|
||||
* Currently assumes text format and just creates a PV. */
|
||||
static SV *fupg_val(pTHX_ const PGresult *r, int row, int col) {
|
||||
if (PQgetisnull(r, row, col)) return newSV(0);
|
||||
return newSVpvn_utf8(PQgetvalue(r, row, col), PQgetlength(r, row, col), 1);
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
/* Set in $conn->q() */
|
||||
SV *self;
|
||||
|
|
@ -163,14 +156,19 @@ typedef struct {
|
|||
UV cookie;
|
||||
char *query;
|
||||
SV **bind;
|
||||
int bindn;
|
||||
int nbind;
|
||||
bool text_params;
|
||||
bool text_results;
|
||||
/* Set during prepare */
|
||||
int prepared;
|
||||
char name[32];
|
||||
PGresult *describe;
|
||||
/* Set during execute */
|
||||
int paramn;
|
||||
int nparam;
|
||||
int nfields;
|
||||
char **param;
|
||||
const fupg_type **recv;
|
||||
void **recvctx;
|
||||
PGresult *result;
|
||||
} fupg_st;
|
||||
|
||||
|
|
@ -178,6 +176,7 @@ static SV *fupg_q(pTHX_ fupg_conn *c, const char *query, I32 ax, I32 argc) {
|
|||
fupg_st *st = safecalloc(1, sizeof(fupg_st));
|
||||
st->conn = c;
|
||||
st->cookie = c->cookie;
|
||||
st->text_params = st->text_results = true; /* TODO: default to false */
|
||||
SvREFCNT_inc(c->self);
|
||||
|
||||
st->query = savepv(query);
|
||||
|
|
@ -185,9 +184,9 @@ static SV *fupg_q(pTHX_ fupg_conn *c, const char *query, I32 ax, I32 argc) {
|
|||
st->bind = safemalloc((argc-2) * sizeof(SV *));
|
||||
I32 i;
|
||||
for (i=2; i < argc; i++) {
|
||||
st->bind[st->bindn] = newSV(0);
|
||||
sv_setsv(st->bind[st->bindn], ST(i));
|
||||
st->bindn++;
|
||||
st->bind[st->nbind] = newSV(0);
|
||||
sv_setsv(st->bind[st->nbind], ST(i));
|
||||
st->nbind++;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -267,12 +266,13 @@ static void fupg_st_execute(pTHX_ fupg_st *st) {
|
|||
if (st->result) fu_confess("Invalid attempt to execute statement multiple times");
|
||||
|
||||
/* TODO: prepare can be skipped when prepared statement caching is disabled and (text-format queries or no bind params) */
|
||||
/* TODO: support binary format params */
|
||||
fupg_st_prepare(aTHX_ st);
|
||||
st->param = safemalloc(st->bindn * sizeof(char *));
|
||||
st->param = safemalloc(st->nbind * sizeof(char *));
|
||||
int i;
|
||||
for (i=0; i<st->bindn; i++) {
|
||||
for (i=0; i<st->nbind; i++) {
|
||||
st->param[i] = SvPVutf8_nolen(st->bind[i]);
|
||||
st->paramn++;
|
||||
st->nparam++;
|
||||
}
|
||||
|
||||
/* I'm not super fond of this approach. Storing the full query results in a
|
||||
|
|
@ -285,7 +285,9 @@ static void fupg_st_execute(pTHX_ fupg_st *st) {
|
|||
* gather that just saves a bit of memory in exchange for more and smaller
|
||||
* malloc()/free()'s. Performance-wise, it probably won't be much of an
|
||||
* improvement */
|
||||
PGresult *r = PQexecPrepared(st->conn->conn, st->name, st->paramn, (const char * const*)st->param, NULL, NULL, 0);
|
||||
PGresult *r = PQexecPrepared(st->conn->conn,
|
||||
st->name, st->nparam, (const char * const*)st->param,
|
||||
NULL, NULL, st->text_results ? 0 : 1);
|
||||
if (!r) fupg_conn_croak(st->conn , "exec");
|
||||
switch (PQresultStatus(r)) {
|
||||
case PGRES_COMMAND_OK:
|
||||
|
|
@ -293,6 +295,24 @@ static void fupg_st_execute(pTHX_ fupg_st *st) {
|
|||
default: fupg_result_croak(r, "exec", st->query);
|
||||
}
|
||||
st->result = r;
|
||||
|
||||
st->nfields = PQnfields(r);
|
||||
st->recv = safecalloc(st->nfields, sizeof(*st->recv));
|
||||
st->recvctx = safecalloc(st->nfields, sizeof(*st->recvctx));
|
||||
for (i=0; i<st->nfields; i++) {
|
||||
st->recv[i] = fupg_type_lookup(st->text_results ? 0 : PQftype(r, i));
|
||||
if (!st->recv[i])
|
||||
fu_confess("Unable to receive query results of type %u", PQftype(r, i));
|
||||
}
|
||||
}
|
||||
|
||||
static SV *fupg_st_getval(pTHX_ fupg_st *st, int row, int col) {
|
||||
PGresult *r = st->result;
|
||||
if (PQgetisnull(r, row, col)) return newSV(0);
|
||||
int len = PQgetlength(st->result, row, col);
|
||||
const fupg_type *t = st->recv[col];
|
||||
if (t->len && len != t->len) fu_confess("invalid length for type %s: %d\n", t->name, len);
|
||||
return t->recv(aTHX_ PQgetvalue(r, row, col), len, st->recvctx[col]);
|
||||
}
|
||||
|
||||
static SV *fupg_st_exec(pTHX_ fupg_st *st) {
|
||||
|
|
@ -306,7 +326,7 @@ static SV *fupg_st_val(pTHX_ fupg_st *st) {
|
|||
if (PQnfields(st->describe) == 0) fu_confess("Invalid use of $st->val() on query returning no data");
|
||||
fupg_st_execute(aTHX_ st);
|
||||
if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->val() on query returning more than one row");
|
||||
SV *sv = PQntuples(st->result) == 0 ? newSV(0) : fupg_val(aTHX_ st->result, 0, 0);
|
||||
SV *sv = PQntuples(st->result) == 0 ? newSV(0) : fupg_st_getval(aTHX_ st, 0, 0);
|
||||
return sv_2mortal(sv);
|
||||
}
|
||||
|
||||
|
|
@ -316,24 +336,24 @@ static I32 fupg_st_rowl(pTHX_ fupg_st *st, I32 ax) {
|
|||
if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowl() on query returning zero rows");
|
||||
if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowl() on query returning more than one row");
|
||||
if (GIMME_V != G_LIST) {
|
||||
ST(0) = sv_2mortal(newSViv(PQnfields(st->result)));
|
||||
ST(0) = sv_2mortal(newSViv(st->nfields));
|
||||
return 1;
|
||||
}
|
||||
int i, nfields = PQnfields(st->result);
|
||||
(void)POPs;
|
||||
EXTEND(SP, nfields);
|
||||
for (i=0; i<nfields; i++) mPUSHs(fupg_val(aTHX_ st->result, 0, i));
|
||||
return nfields;
|
||||
EXTEND(SP, st->nfields);
|
||||
int i;
|
||||
for (i=0; i<st->nfields; i++) mPUSHs(fupg_st_getval(aTHX_ st, 0, i));
|
||||
return st->nfields;
|
||||
}
|
||||
|
||||
static SV *fupg_st_rowa(pTHX_ fupg_st *st) {
|
||||
fupg_st_execute(aTHX_ st);
|
||||
if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowl() on query returning zero rows");
|
||||
if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowl() on query returning more than one row");
|
||||
int i, nfields = PQnfields(st->result);
|
||||
AV *av = newAV_alloc_x(nfields);
|
||||
AV *av = newAV_alloc_x(st->nfields);
|
||||
SV *sv = sv_2mortal(newRV_noinc((SV *)av));
|
||||
for (i=0; i<nfields; i++) av_push_simple(av, fupg_val(aTHX_ st->result, 0, i));
|
||||
int i;
|
||||
for (i=0; i<st->nfields; i++) av_push_simple(av, fupg_st_getval(aTHX_ st, 0, i));
|
||||
return sv;
|
||||
}
|
||||
|
||||
|
|
@ -343,12 +363,12 @@ static SV *fupg_st_rowh(pTHX_ fupg_st *st) {
|
|||
fupg_st_execute(aTHX_ st);
|
||||
if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowh() on query returning zero rows");
|
||||
if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowh() on query returning more than one row");
|
||||
int i, nfields = PQnfields(st->result);
|
||||
HV *hv = newHV();
|
||||
SV *sv = sv_2mortal(newRV_noinc((SV *)hv));
|
||||
for (i=0; i<nfields; i++) {
|
||||
int i;
|
||||
for (i=0; i<st->nfields; i++) {
|
||||
const char *key = PQfname(st->result, i);
|
||||
hv_store(hv, key, -strlen(key), fupg_val(aTHX_ st->result, 0, i), 0);
|
||||
hv_store(hv, key, -strlen(key), fupg_st_getval(aTHX_ st, 0, i), 0);
|
||||
}
|
||||
return sv;
|
||||
}
|
||||
|
|
@ -358,11 +378,14 @@ static void fupg_st_destroy(fupg_st *st) {
|
|||
/* Ignore failure, this is just a best-effort attempt to free up resources on the backend */
|
||||
if (st->prepared) PQclear(PQclosePrepared(st->conn->conn, st->name));
|
||||
|
||||
if (st->query) safefree(st->query);
|
||||
for (i=0; i < st->bindn; i++) SvREFCNT_dec(st->bind[i]);
|
||||
if (st->bind) safefree(st->bind);
|
||||
/* XXX: These point into bind SVs (for now): for (i=0; i < st->paramn; i++) safefree(st->param[i]); */
|
||||
if (st->param) safefree(st->param);
|
||||
safefree(st->query);
|
||||
for (i=0; i < st->nbind; i++) SvREFCNT_dec(st->bind[i]);
|
||||
safefree(st->bind);
|
||||
/* XXX: These point into bind SVs (for now):
|
||||
* for (i=0; i < st->nparam; i++) safefree(st->param[i]); */
|
||||
safefree(st->param);
|
||||
safefree(st->recv);
|
||||
safefree(st->recvctx); /* XXX: Needs type-specific free() for the individual pointers */
|
||||
PQclear(st->describe);
|
||||
PQclear(st->result);
|
||||
SvREFCNT_dec(st->conn->self);
|
||||
|
|
@ -372,4 +395,5 @@ static void fupg_st_destroy(fupg_st *st) {
|
|||
|
||||
/* TODO: $st->alla, allh, flat, kvv, kva, kvh */
|
||||
/* TODO: Prepared statement caching */
|
||||
/* TODO: Binary format fetching & type handling */
|
||||
/* TODO: Binary format bind parameters */
|
||||
/* TODO: Custom type handling */
|
||||
|
|
|
|||
72
c/pgtypes.c
Normal file
72
c/pgtypes.c
Normal file
|
|
@ -0,0 +1,72 @@
|
|||
/* Send function, takes a Perl value and should write the binary encoded
|
||||
* format into the given fustr. */
|
||||
typedef void (*fupg_send_fn)(pTHX_ SV *, fustr *, void *);
|
||||
|
||||
/* Receive function, takes a binary string and should return a Perl value.
|
||||
* libpq guarantees that the given buffer is aligned to MAXIMUM_ALIGNOF. */
|
||||
typedef SV *(*fupg_recv_fn)(pTHX_ const char *, int, void *);
|
||||
|
||||
typedef struct {
|
||||
Oid oid;
|
||||
int len;
|
||||
const char *name;
|
||||
fupg_send_fn send;
|
||||
fupg_recv_fn recv;
|
||||
} fupg_type;
|
||||
|
||||
|
||||
|
||||
#define RECVFN(name) static SV *fupg_recv_##name(pTHX_ const char *buf, int buflen __attribute__((unused)), void *data __attribute__((unused)))
|
||||
|
||||
RECVFN(textfmt) {
|
||||
return newSVpvn_utf8(buf, buflen, 1);
|
||||
}
|
||||
|
||||
RECVFN(bool) {
|
||||
return *buf ? &PL_sv_yes : &PL_sv_no;
|
||||
}
|
||||
|
||||
RECVFN(int2) {
|
||||
return newSViv((I16)__builtin_bswap16(*((U16 *)buf)));
|
||||
}
|
||||
|
||||
RECVFN(int4) {
|
||||
return newSViv((I32)__builtin_bswap32(*((U32 *)buf)));
|
||||
}
|
||||
|
||||
RECVFN(int8) {
|
||||
return newSViv((I64)__builtin_bswap64(*((U64 *)buf)));
|
||||
}
|
||||
|
||||
#undef RECVFN
|
||||
|
||||
|
||||
|
||||
#define R(name) fupg_recv_##name
|
||||
|
||||
/* Sorted by oid to support binary search.
|
||||
* (XXX: hash lookup might be faster, but requires codegen) */
|
||||
static const fupg_type fupg_types[] = {
|
||||
{ 0, 0, NULL, NULL, R(textfmt) }, /* Invalid Oid, abused for text format */
|
||||
{ 16, 1, "bool", NULL, R(bool) },
|
||||
{ 20, 8, "int8", NULL, R(int8) },
|
||||
{ 21, 2, "int2", NULL, R(int2) },
|
||||
{ 23, 4, "int4", NULL, R(int4) },
|
||||
};
|
||||
/* TODO: A LOT MORE TYPES */
|
||||
|
||||
#undef R
|
||||
|
||||
#define FUPG_TYPES (sizeof(fupg_types) / sizeof(fupg_type))
|
||||
|
||||
|
||||
static const fupg_type *fupg_type_lookup(Oid oid) {
|
||||
int i, b = 0, e = FUPG_TYPES-1;
|
||||
while (b <= e) {
|
||||
i = b + (e - b)/2;
|
||||
if (fupg_types[i].oid == oid) return fupg_types+i;
|
||||
if (fupg_types[i].oid < oid) b = i+1;
|
||||
else e = i-1;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
28
t/pgtypes.t
Normal file
28
t/pgtypes.t
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
use v5.36;
|
||||
use Test::More;
|
||||
no warnings 'experimental::builtin';
|
||||
use builtin qw/true false is_bool created_as_number/;
|
||||
|
||||
plan skip_all => $@ if !eval { require FU::PG; } && $@ =~ /Unable to load libpq/;
|
||||
die $@ if $@;
|
||||
plan skip_all => 'Please set FU_TEST_DB to a PostgreSQL connection string to run these tests' if !$ENV{FU_TEST_DB};
|
||||
|
||||
my $conn = FU::PG->connect($ENV{FU_TEST_DB});
|
||||
$conn->_debug_trace(0);
|
||||
|
||||
sub v($type, $v, $sql=$v) {
|
||||
$sql = "($sql)::$type";
|
||||
my $res = $conn->q("SELECT $sql")->text_results(0)->val;
|
||||
ok is_bool($res), "recv bool $sql" if $type eq 'bool';
|
||||
ok created_as_number($res), "recv number $sql" if $type =~ /^int/;
|
||||
is $res, $v, "recv value $sql";
|
||||
}
|
||||
|
||||
v bool => true, 'true';
|
||||
v bool => false, 'false';
|
||||
|
||||
v int2 => $_ for (1, -1, -32768, 32767, 12345, -12345);
|
||||
v int4 => $_ for (1, -1, -2147483648, 2147483647, 1234567890, -1234567890);
|
||||
v int8 => $_ for (1, -1, -9223372036854775808, 9223372036854775807, 1234567890123456789, -1234567890123456789);
|
||||
|
||||
done_testing;
|
||||
Loading…
Add table
Add a link
Reference in a new issue