diff --git a/FU.xs b/FU.xs index f583706..98a0063 100644 --- a/FU.xs +++ b/FU.xs @@ -16,6 +16,7 @@ #include "c/libpq.h" #include "c/pgtypes.c" #include "c/pgconn.c" +#include "c/pgst.c" #define FUPG_CONN_COOKIE \ @@ -227,6 +228,36 @@ void rowh(fupg_st *st) FUPG_ST_COOKIE; ST(0) = fupg_st_rowh(aTHX_ st); +void alla(fupg_st *st) + CODE: + FUPG_ST_COOKIE; + ST(0) = fupg_st_alla(aTHX_ st); + +void allh(fupg_st *st) + CODE: + FUPG_ST_COOKIE; + ST(0) = fupg_st_allh(aTHX_ st); + +void flat(fupg_st *st) + CODE: + FUPG_ST_COOKIE; + ST(0) = fupg_st_flat(aTHX_ st); + +void kvv(fupg_st *st) + CODE: + FUPG_ST_COOKIE; + ST(0) = fupg_st_kvv(aTHX_ st); + +void kva(fupg_st *st) + CODE: + FUPG_ST_COOKIE; + ST(0) = fupg_st_kva(aTHX_ st); + +void kvh(fupg_st *st) + CODE: + FUPG_ST_COOKIE; + ST(0) = fupg_st_kvh(aTHX_ st); + void DESTROY(fupg_st *st) CODE: fupg_st_destroy(st); diff --git a/c/pgconn.c b/c/pgconn.c index 18654b3..9b472f2 100644 --- a/c/pgconn.c +++ b/c/pgconn.c @@ -26,31 +26,6 @@ struct fupg_txn { char rollback_cmd[64]; }; -typedef struct { - /* Set on creation */ - SV *self; /* (unused, but whatever) */ - fupg_conn *conn; /* has a refcnt on conn->self */ - UV cookie; - char *query; - SV **bind; - int nbind; - int stflags; - - /* Set during prepare */ - int prepared; - char name[32]; - PGresult *describe; - - /* Set during execute */ - int nfields; - const char **param_values; /* Points into conn->buf or st->bind SVs, may be invalid after exec */ - int *param_lengths; - int *param_formats; - fupg_tio send; - fupg_tio *recv; - PGresult *result; -} fupg_st; - @@ -259,6 +234,11 @@ static void fupg_txn_destroy(pTHX_ fupg_txn *t) { safefree(t); } + + + +/* Type handling */ + /* XXX: It feels a bit wasteful to load *all* types; even on an empty database * that's ~55k of data, but it's easier and (potentially) faster than fetching * each type seperately as we encounter them. @@ -327,158 +307,35 @@ static const fupg_type *fupg_lookup_type(pTHX_ fupg_conn *c, int *refresh_done, return fupg_type_byoid(c->types, c->ntypes, oid); } +#define FUPGT_TEXT 1 +#define FUPGT_SEND 2 +#define FUPGT_RECV 4 - -/* Querying */ - -static SV *fupg_exec(pTHX_ fupg_conn *c, const char *sql) { - PGresult *r = PQexec(c->conn, sql); - if (!r) fupg_conn_croak(c, "exec"); - switch (PQresultStatus(r)) { - case PGRES_EMPTY_QUERY: - case PGRES_COMMAND_OK: - case PGRES_TUPLES_OK: break; - default: fupg_result_croak(r, "exec", sql); - } - SV *ret = fupg_exec_result(r); - PQclear(r); - return ret; -} - -static SV *fupg_q(pTHX_ fupg_conn *c, int stflags, const char *query, I32 ax, I32 argc) { - fupg_st *st = safecalloc(1, sizeof(fupg_st)); - st->conn = c; - st->cookie = c->cookie; - st->stflags = stflags; - SvREFCNT_inc(c->self); - - st->query = savepv(query); - if (argc > 2) { - st->bind = safemalloc((argc-2) * sizeof(SV *)); - I32 i; - for (i=2; i < argc; i++) { - SvGETMAGIC(ST(i)); - st->bind[st->nbind] = SvREFCNT_inc(ST(i)); - st->nbind++; - } - } - - return fu_selfobj(st, "FU::PG::st"); -} - -static void fupg_st_prepare(pTHX_ fupg_st *st) { - if (st->describe) return; - if (st->prepared) fu_confess("invalid attempt to re-prepare invalid statement"); - - /* TODO: This is where we check for any cached prepared statements */ - - snprintf(st->name, sizeof(st->name), "fupg%"UVuf, ++st->conn->prep_counter); - - /* Send prepare + describe in a pipeline to avoid a double round-trip with the server */ - PQenterPipelineMode(st->conn->conn); - PQsendPrepare(st->conn->conn, st->name, st->query, 0, NULL); - PQsendDescribePrepared(st->conn->conn, st->name); - PQpipelineSync(st->conn->conn); - PGresult *prep = PQgetResult(st->conn->conn); PQgetResult(st->conn->conn); /* NULL */ - PGresult *desc = PQgetResult(st->conn->conn); PQgetResult(st->conn->conn); /* NULL */ - PGresult *sync = PQgetResult(st->conn->conn); - PQexitPipelineMode(st->conn->conn); - - if (!prep) { - PQclear(desc); PQclear(sync); - fupg_conn_croak(st->conn , "prepare"); - } - if (PQresultStatus(prep) != PGRES_COMMAND_OK) { - PQclear(desc); PQclear(sync); - fupg_result_croak(prep, "prepare", st->query); - } - PQclear(prep); - st->prepared = 1; - - if (!desc) { - PQclear(sync); - fupg_conn_croak(st->conn , "prepare"); - } - if (PQresultStatus(desc) != PGRES_COMMAND_OK) { - PQclear(sync); - fupg_result_croak(desc, "prepare", st->query); - } - st->describe = desc; - - if (!sync) fupg_conn_croak(st->conn , "prepare"); - if (PQresultStatus(sync) != PGRES_PIPELINE_SYNC) - fupg_result_croak(sync, "prepare", st->query); - PQclear(sync); -} - -static SV *fupg_st_params(pTHX_ fupg_st *st) { - fupg_st_prepare(aTHX_ st); - int i, nparams = PQnparams(st->describe); - AV *av = newAV_alloc_x(nparams); - for (i=0; idescribe, i))); - av_push_simple(av, newRV_noinc((SV *)hv)); - } - return sv_2mortal(newRV_noinc((SV *)av)); -} - -static SV *fupg_st_columns(pTHX_ fupg_st *st) { - fupg_st_prepare(aTHX_ st); - int i, ncols = PQnfields(st->describe); - AV *av = newAV_alloc_x(ncols); - for (i=0; idescribe, i); - hv_stores(hv, "name", newSVpvn_utf8(name, strlen(name), 1)); - hv_stores(hv, "oid", newSViv(PQftype(st->describe, i))); - int tmod = PQfmod(st->describe, i); - if (tmod >= 0) hv_stores(hv, "typemod", newSViv(tmod)); - av_push_simple(av, newRV_noinc((SV *)hv)); - } - return sv_2mortal(newRV_noinc((SV *)av)); -} - -static void fupg_st_check_dupcols(pTHX_ PGresult *r) { - HV *hv = newHV(); - int i, nfields = PQnfields(r); - for (i=0; ioid = oid; - if (st->stflags & (issend ? FUPG_TEXT_PARAMS : FUPG_TEXT_RESULTS)) { + if (flags & FUPGT_TEXT) { tio->name = "{textfmt}"; tio->send = fupg_send_text; tio->recv = fupg_recv_text; return; } - const fupg_type *e, *t = fupg_lookup_type(aTHX_ st->conn, refresh_done, oid); + const fupg_type *e, *t = fupg_lookup_type(aTHX_ conn, refresh_done, oid); if (!t) fu_confess("No type found with oid %u", oid); if (!t->send || !t->recv) fu_confess("Unable to send or receive type '%s' (oid %u)", t->name, oid); tio->name = t->name; - if (issend ? t->send == fupg_send_domain : t->recv == fupg_recv_domain) { - e = fupg_lookup_type(aTHX_ st->conn, refresh_done, t->elemoid); + if (flags & FUPGT_SEND ? t->send == fupg_send_domain : t->recv == fupg_recv_domain) { + e = fupg_lookup_type(aTHX_ conn, refresh_done, t->elemoid); if (!e) fu_confess("Base type %u not found for domain '%s' (oid %u)", t->elemoid, t->name, t->oid); t = e; } tio->send = t->send; tio->recv = t->recv; - if (issend ? tio->send == fupg_send_array : tio->recv == fupg_recv_array) { + if (flags & FUPGT_SEND ? tio->send == fupg_send_array : tio->recv == fupg_recv_array) { tio->arrayelem = safecalloc(1, sizeof(*tio->arrayelem)); - fupg_tio_setup(aTHX_ st, tio->arrayelem, issend, t->elemoid, refresh_done); + fupg_tio_setup(aTHX_ conn, tio->arrayelem, flags, t->elemoid, refresh_done); } } @@ -489,177 +346,3 @@ static void fupg_tio_free(fupg_tio *tio) { safefree(tio->arrayelem); } } - -static void fupg_params_setup(pTHX_ fupg_st *st, int *refresh_done) { - int i; - st->param_values = safecalloc(st->nbind, sizeof(*st->param_values)); - if (st->stflags & FUPG_TEXT_PARAMS) { - for (i=0; inbind; i++) - st->param_values[i] = !SvOK(st->bind[i]) ? NULL : SvPVutf8_nolen(st->bind[i]); - return; - } - - fustr *buf = &st->conn->buf; - buf->cur = fustr_start(buf); - st->param_lengths = safecalloc(st->nbind, sizeof(*st->param_lengths)); - st->param_formats = safecalloc(st->nbind, sizeof(*st->param_formats)); - size_t off = 0; - for (i=0; inbind; i++) { - if (!SvOK(st->bind[i])) { - st->param_values[i] = NULL; - continue; - } - fupg_tio_setup(aTHX_ st, &st->send, 1, PQparamtype(st->describe, i), refresh_done); - off = fustr_len(buf); - st->send.send(aTHX_ &st->send, st->bind[i], buf); - fupg_tio_free(&st->send); - memset(&st->send, 0, sizeof(st->send)); - - st->param_lengths[i] = fustr_len(buf) - off; - st->param_formats[i] = 1; - st->param_values[i] = ""; - /* Don't write param_values here, the buffer may be invalidated when writing the next param */ - } - off = 0; - buf->cur = fustr_start(buf); - for (i=0; inbind; i++) { - if (st->param_values[i]) { - st->param_values[i] = buf->cur + off; - off += st->param_lengths[i]; - } - } -} - -static void fupg_st_execute(pTHX_ fupg_st *st) { - /* Disallow fetching the results more than once. I don't see a reason why - * someone would need that and disallowing it leaves room for fetching the - * results in a streaming fashion without breaking API compat. */ - if (st->result) fu_confess("Invalid attempt to execute statement multiple times"); - - /* TODO: prepare can be skipped when prepared statement caching is disabled and (text-format queries or no bind params) */ - fupg_st_prepare(aTHX_ st); - if (PQnparams(st->describe) != st->nbind) - fu_confess("Statement expects %d bind parameters but %d were given", PQnparams(st->describe), st->nbind); - int refresh_done = 0; - fupg_params_setup(aTHX_ st, &refresh_done); - - /* I'm not super fond of this approach. Storing the full query results in a - * PGresult involves unnecessary parsing, memory allocation and copying. - * The wire protocol is sufficiently simple that I could parse the query - * results directly from the network buffers without much additional code, - * and that would be much more efficient. Alas, libpq doesn't let me do - * that. - * There is the option of fetching results in chunked mode, but from what I - * gather that just saves a bit of memory in exchange for more and smaller - * malloc()/free()'s. Performance-wise, it probably won't be much of an - * improvement */ - PGresult *r = PQexecPrepared(st->conn->conn, - st->name, - st->nbind, - (const char * const *)st->param_values, - st->param_lengths, - st->param_formats, - st->stflags & FUPG_TEXT_RESULTS ? 0 : 1); - if (!r) fupg_conn_croak(st->conn , "exec"); - switch (PQresultStatus(r)) { - case PGRES_COMMAND_OK: - case PGRES_TUPLES_OK: break; - default: fupg_result_croak(r, "exec", st->query); - } - st->result = r; - - st->nfields = PQnfields(r); - st->recv = safecalloc(st->nfields, sizeof(*st->recv)); - int i; - for (i=0; infields; i++) - fupg_tio_setup(aTHX_ st, st->recv + i, 0, PQftype(st->result, i), &refresh_done); -} - -static SV *fupg_st_getval(pTHX_ fupg_st *st, int row, int col) { - PGresult *r = st->result; - if (PQgetisnull(r, row, col)) return newSV(0); - const fupg_tio *ctx = st->recv+col; - return ctx->recv(aTHX_ ctx, PQgetvalue(r, row, col), PQgetlength(r, row, col)); -} - -static SV *fupg_st_exec(pTHX_ fupg_st *st) { - fupg_st_execute(aTHX_ st); - return fupg_exec_result(st->result); -} - -static SV *fupg_st_val(pTHX_ fupg_st *st) { - fupg_st_prepare(aTHX_ st); - if (PQnfields(st->describe) > 1) fu_confess("Invalid use of $st->val() on query returning more than one column"); - if (PQnfields(st->describe) == 0) fu_confess("Invalid use of $st->val() on query returning no data"); - fupg_st_execute(aTHX_ st); - if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->val() on query returning more than one row"); - SV *sv = PQntuples(st->result) == 0 ? newSV(0) : fupg_st_getval(aTHX_ st, 0, 0); - return sv_2mortal(sv); -} - -static I32 fupg_st_rowl(pTHX_ fupg_st *st, I32 ax) { - dSP; - fupg_st_execute(aTHX_ st); - if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowl() on query returning zero rows"); - if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowl() on query returning more than one row"); - if (GIMME_V != G_LIST) { - ST(0) = sv_2mortal(newSViv(st->nfields)); - return 1; - } - (void)POPs; - EXTEND(SP, st->nfields); - int i; - for (i=0; infields; i++) mPUSHs(fupg_st_getval(aTHX_ st, 0, i)); - return st->nfields; -} - -static SV *fupg_st_rowa(pTHX_ fupg_st *st) { - fupg_st_execute(aTHX_ st); - if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowl() on query returning zero rows"); - if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowl() on query returning more than one row"); - AV *av = newAV_alloc_x(st->nfields); - SV *sv = sv_2mortal(newRV_noinc((SV *)av)); - int i; - for (i=0; infields; i++) av_push_simple(av, fupg_st_getval(aTHX_ st, 0, i)); - return sv; -} - -static SV *fupg_st_rowh(pTHX_ fupg_st *st) { - fupg_st_prepare(aTHX_ st); - fupg_st_check_dupcols(aTHX_ st->describe); - fupg_st_execute(aTHX_ st); - if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowh() on query returning zero rows"); - if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowh() on query returning more than one row"); - HV *hv = newHV(); - SV *sv = sv_2mortal(newRV_noinc((SV *)hv)); - int i; - for (i=0; infields; i++) { - const char *key = PQfname(st->result, i); - hv_store(hv, key, -strlen(key), fupg_st_getval(aTHX_ st, 0, i), 0); - } - return sv; -} - -static void fupg_st_destroy(fupg_st *st) { - int i; - /* Ignore failure, this is just a best-effort attempt to free up resources on the backend */ - if (st->prepared) PQclear(PQclosePrepared(st->conn->conn, st->name)); - - safefree(st->query); - for (i=0; i < st->nbind; i++) SvREFCNT_dec(st->bind[i]); - safefree(st->bind); - safefree(st->param_values); - safefree(st->param_lengths); - safefree(st->param_formats); - if (st->recv) for (i=0; infields; i++) fupg_tio_free(st->recv + i); - fupg_tio_free(&st->send); - safefree(st->recv); - PQclear(st->describe); - PQclear(st->result); - SvREFCNT_dec(st->conn->self); - safefree(st); -} - - -/* TODO: $st->alla, allh, flat, kvv, kva, kvh */ -/* TODO: Prepared statement caching */ diff --git a/c/pgst.c b/c/pgst.c new file mode 100644 index 0000000..952d72a --- /dev/null +++ b/c/pgst.c @@ -0,0 +1,421 @@ +typedef struct { + /* Set on creation */ + SV *self; /* (unused, but whatever) */ + fupg_conn *conn; /* has a refcnt on conn->self */ + UV cookie; + char *query; + SV **bind; + int nbind; + int stflags; + + /* Set during prepare */ + int prepared; + char name[32]; + PGresult *describe; + + /* Set during execute */ + int nfields; + const char **param_values; /* Points into conn->buf or st->bind SVs, may be invalid after exec */ + int *param_lengths; + int *param_formats; + fupg_tio send; + fupg_tio *recv; + PGresult *result; +} fupg_st; + + +static SV *fupg_exec(pTHX_ fupg_conn *c, const char *sql) { + PGresult *r = PQexec(c->conn, sql); + if (!r) fupg_conn_croak(c, "exec"); + switch (PQresultStatus(r)) { + case PGRES_EMPTY_QUERY: + case PGRES_COMMAND_OK: + case PGRES_TUPLES_OK: break; + default: fupg_result_croak(r, "exec", sql); + } + SV *ret = fupg_exec_result(r); + PQclear(r); + return ret; +} + +static SV *fupg_q(pTHX_ fupg_conn *c, int stflags, const char *query, I32 ax, I32 argc) { + fupg_st *st = safecalloc(1, sizeof(fupg_st)); + st->conn = c; + st->cookie = c->cookie; + st->stflags = stflags; + SvREFCNT_inc(c->self); + + st->query = savepv(query); + if (argc > 2) { + st->bind = safemalloc((argc-2) * sizeof(SV *)); + I32 i; + for (i=2; i < argc; i++) { + SvGETMAGIC(ST(i)); + st->bind[st->nbind] = SvREFCNT_inc(ST(i)); + st->nbind++; + } + } + + return fu_selfobj(st, "FU::PG::st"); +} + +static void fupg_st_destroy(fupg_st *st) { + int i; + /* Ignore failure, this is just a best-effort attempt to free up resources on the backend */ + if (st->prepared) PQclear(PQclosePrepared(st->conn->conn, st->name)); + + safefree(st->query); + for (i=0; i < st->nbind; i++) SvREFCNT_dec(st->bind[i]); + safefree(st->bind); + safefree(st->param_values); + safefree(st->param_lengths); + safefree(st->param_formats); + if (st->recv) for (i=0; infields; i++) fupg_tio_free(st->recv + i); + fupg_tio_free(&st->send); + safefree(st->recv); + PQclear(st->describe); + PQclear(st->result); + SvREFCNT_dec(st->conn->self); + safefree(st); +} + +static void fupg_st_prepare(pTHX_ fupg_st *st) { + if (st->describe) return; + if (st->prepared) fu_confess("invalid attempt to re-prepare invalid statement"); + + /* TODO: This is where we check for any cached prepared statements */ + + snprintf(st->name, sizeof(st->name), "fupg%"UVuf, ++st->conn->prep_counter); + + /* Send prepare + describe in a pipeline to avoid a double round-trip with the server */ + PQenterPipelineMode(st->conn->conn); + PQsendPrepare(st->conn->conn, st->name, st->query, 0, NULL); + PQsendDescribePrepared(st->conn->conn, st->name); + PQpipelineSync(st->conn->conn); + PGresult *prep = PQgetResult(st->conn->conn); PQgetResult(st->conn->conn); /* NULL */ + PGresult *desc = PQgetResult(st->conn->conn); PQgetResult(st->conn->conn); /* NULL */ + PGresult *sync = PQgetResult(st->conn->conn); + PQexitPipelineMode(st->conn->conn); + + if (!prep) { + PQclear(desc); PQclear(sync); + fupg_conn_croak(st->conn , "prepare"); + } + if (PQresultStatus(prep) != PGRES_COMMAND_OK) { + PQclear(desc); PQclear(sync); + fupg_result_croak(prep, "prepare", st->query); + } + PQclear(prep); + st->prepared = 1; + + if (!desc) { + PQclear(sync); + fupg_conn_croak(st->conn , "prepare"); + } + if (PQresultStatus(desc) != PGRES_COMMAND_OK) { + PQclear(sync); + fupg_result_croak(desc, "prepare", st->query); + } + st->describe = desc; + + if (!sync) fupg_conn_croak(st->conn , "prepare"); + if (PQresultStatus(sync) != PGRES_PIPELINE_SYNC) + fupg_result_croak(sync, "prepare", st->query); + PQclear(sync); +} + +static SV *fupg_st_params(pTHX_ fupg_st *st) { + fupg_st_prepare(aTHX_ st); + int i, nparams = PQnparams(st->describe); + AV *av = newAV_alloc_x(nparams); + for (i=0; idescribe, i))); + av_push_simple(av, newRV_noinc((SV *)hv)); + } + return sv_2mortal(newRV_noinc((SV *)av)); +} + +static SV *fupg_st_columns(pTHX_ fupg_st *st) { + fupg_st_prepare(aTHX_ st); + int i, nfields = PQnfields(st->describe); + AV *av = newAV_alloc_x(nfields); + for (i=0; idescribe, i); + hv_stores(hv, "name", newSVpvn_utf8(name, strlen(name), 1)); + hv_stores(hv, "oid", newSViv(PQftype(st->describe, i))); + int tmod = PQfmod(st->describe, i); + if (tmod >= 0) hv_stores(hv, "typemod", newSViv(tmod)); + av_push_simple(av, newRV_noinc((SV *)hv)); + } + return sv_2mortal(newRV_noinc((SV *)av)); +} + +static void fupg_params_setup(pTHX_ fupg_st *st, int *refresh_done) { + int i; + st->param_values = safecalloc(st->nbind, sizeof(*st->param_values)); + if (st->stflags & FUPG_TEXT_PARAMS) { + for (i=0; inbind; i++) + st->param_values[i] = !SvOK(st->bind[i]) ? NULL : SvPVutf8_nolen(st->bind[i]); + return; + } + + fustr *buf = &st->conn->buf; + buf->cur = fustr_start(buf); + st->param_lengths = safecalloc(st->nbind, sizeof(*st->param_lengths)); + st->param_formats = safecalloc(st->nbind, sizeof(*st->param_formats)); + size_t off = 0; + for (i=0; inbind; i++) { + if (!SvOK(st->bind[i])) { + st->param_values[i] = NULL; + continue; + } + fupg_tio_setup(aTHX_ st->conn, &st->send, + FUPGT_SEND | (st->stflags & FUPG_TEXT_PARAMS ? FUPGT_TEXT : 0), + PQparamtype(st->describe, i), refresh_done); + off = fustr_len(buf); + st->send.send(aTHX_ &st->send, st->bind[i], buf); + fupg_tio_free(&st->send); + memset(&st->send, 0, sizeof(st->send)); + + st->param_lengths[i] = fustr_len(buf) - off; + st->param_formats[i] = 1; + st->param_values[i] = ""; + /* Don't write param_values here, the buffer may be invalidated when writing the next param */ + } + off = 0; + buf->cur = fustr_start(buf); + for (i=0; inbind; i++) { + if (st->param_values[i]) { + st->param_values[i] = buf->cur + off; + off += st->param_lengths[i]; + } + } +} + +static void fupg_st_execute(pTHX_ fupg_st *st) { + /* Disallow fetching the results more than once. I don't see a reason why + * someone would need that and disallowing it leaves room for fetching the + * results in a streaming fashion without breaking API compat. */ + if (st->result) fu_confess("Invalid attempt to execute statement multiple times"); + + /* TODO: prepare can be skipped when prepared statement caching is disabled and (text-format queries or no bind params) */ + fupg_st_prepare(aTHX_ st); + if (PQnparams(st->describe) != st->nbind) + fu_confess("Statement expects %d bind parameters but %d were given", PQnparams(st->describe), st->nbind); + int refresh_done = 0; + fupg_params_setup(aTHX_ st, &refresh_done); + + /* I'm not super fond of this approach. Storing the full query results in a + * PGresult involves unnecessary parsing, memory allocation and copying. + * The wire protocol is sufficiently simple that I could parse the query + * results directly from the network buffers without much additional code, + * and that would be much more efficient. Alas, libpq doesn't let me do + * that. + * There is the option of fetching results in chunked mode, but from what I + * gather that just saves a bit of memory in exchange for more and smaller + * malloc()/free()'s. Performance-wise, it probably won't be much of an + * improvement */ + PGresult *r = PQexecPrepared(st->conn->conn, + st->name, + st->nbind, + (const char * const *)st->param_values, + st->param_lengths, + st->param_formats, + st->stflags & FUPG_TEXT_RESULTS ? 0 : 1); + if (!r) fupg_conn_croak(st->conn , "exec"); + switch (PQresultStatus(r)) { + case PGRES_COMMAND_OK: + case PGRES_TUPLES_OK: break; + default: fupg_result_croak(r, "exec", st->query); + } + st->result = r; + + st->nfields = PQnfields(r); + st->recv = safecalloc(st->nfields, sizeof(*st->recv)); + int i; + for (i=0; infields; i++) + fupg_tio_setup(aTHX_ st->conn, st->recv + i, + FUPGT_RECV | (st->stflags & FUPG_TEXT_RESULTS ? FUPGT_TEXT : 0), + PQftype(st->result, i), &refresh_done); +} + +static SV *fupg_st_getval(pTHX_ fupg_st *st, int row, int col) { + PGresult *r = st->result; + if (PQgetisnull(r, row, col)) return newSV(0); + const fupg_tio *ctx = st->recv+col; + return ctx->recv(aTHX_ ctx, PQgetvalue(r, row, col), PQgetlength(r, row, col)); +} + +static void fupg_st_check_dupcols(pTHX_ fupg_st *st, int start) { + PGresult *r = st->result; + HV *hv = newHV(); + sv_2mortal((SV *)hv); + int i, nfields = PQnfields(r); + for (i=start; iresult); +} + +static SV *fupg_st_val(pTHX_ fupg_st *st) { + fupg_st_execute(aTHX_ st); + if (st->nfields > 1) fu_confess("Invalid use of $st->val() on query returning more than one column"); + if (st->nfields == 0) fu_confess("Invalid use of $st->val() on query returning no data"); + if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->val() on query returning more than one row"); + SV *sv = PQntuples(st->result) == 0 ? newSV(0) : fupg_st_getval(aTHX_ st, 0, 0); + return sv_2mortal(sv); +} + +static I32 fupg_st_rowl(pTHX_ fupg_st *st, I32 ax) { + dSP; + fupg_st_execute(aTHX_ st); + if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowl() on query returning zero rows"); + if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowl() on query returning more than one row"); + if (GIMME_V != G_LIST) { + ST(0) = sv_2mortal(newSViv(st->nfields)); + return 1; + } + (void)POPs; + EXTEND(SP, st->nfields); + int i; + for (i=0; infields; i++) mPUSHs(fupg_st_getval(aTHX_ st, 0, i)); + return st->nfields; +} + +static SV *fupg_st_rowa(pTHX_ fupg_st *st) { + fupg_st_execute(aTHX_ st); + if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowl() on query returning zero rows"); + if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowl() on query returning more than one row"); + AV *av = newAV_alloc_x(st->nfields); + SV *sv = sv_2mortal(newRV_noinc((SV *)av)); + int i; + for (i=0; infields; i++) av_push_simple(av, fupg_st_getval(aTHX_ st, 0, i)); + return sv; +} + +static SV *fupg_st_rowh(pTHX_ fupg_st *st) { + fupg_st_execute(aTHX_ st); + fupg_st_check_dupcols(aTHX_ st, 0); + if (PQntuples(st->result) == 0) fu_confess("Invalid use of $st->rowh() on query returning zero rows"); + if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowh() on query returning more than one row"); + HV *hv = newHV(); + SV *sv = sv_2mortal(newRV_noinc((SV *)hv)); + int i; + for (i=0; infields; i++) { + const char *key = PQfname(st->result, i); + hv_store(hv, key, -strlen(key), fupg_st_getval(aTHX_ st, 0, i), 0); + } + return sv; +} + +static SV *fupg_st_alla(pTHX_ fupg_st *st) { + fupg_st_execute(aTHX_ st); + int i, j, nrows = PQntuples(st->result); + AV *av = newAV_alloc_x(nrows); + SV *sv = sv_2mortal(newRV_noinc((SV *)av)); + for (i=0; infields); + av_push_simple(av, newRV_noinc((SV *)row)); + for (j=0; jnfields; j++) + av_push_simple(row, fupg_st_getval(aTHX_ st, i, j)); + } + return sv; +} + +static SV *fupg_st_allh(pTHX_ fupg_st *st) { + fupg_st_execute(aTHX_ st); + fupg_st_check_dupcols(aTHX_ st, 0); + int i, j, nrows = PQntuples(st->result); + AV *av = newAV_alloc_x(nrows); + SV *sv = sv_2mortal(newRV_noinc((SV *)av)); + for (i=0; infields; j++) { + const char *key = PQfname(st->result, j); + hv_store(row, key, -strlen(key), fupg_st_getval(aTHX_ st, i, j), 0); + } + } + return sv; +} + +static SV *fupg_st_flat(pTHX_ fupg_st *st) { + fupg_st_execute(aTHX_ st); + int i, j, nrows = PQntuples(st->result); + AV *av = newAV_alloc_x(nrows); + SV *sv = sv_2mortal(newRV_noinc((SV *)av)); + for (i=0; infields; j++) + av_push_simple(av, fupg_st_getval(aTHX_ st, i, j)); + } + return sv; +} + +static SV *fupg_st_kvv(pTHX_ fupg_st *st) { + fupg_st_execute(aTHX_ st); + if (st->nfields > 2) fu_confess("Invalid use of $st->kvv() on query returning more than two columns"); + if (st->nfields == 0) fu_confess("Invalid use of $st->kvv() on query returning no data"); + int i, nrows = PQntuples(st->result); + HV *hv = newHV(); + SV *sv = sv_2mortal(newRV_noinc((SV *)hv)); + for (i=0; ikvv() query results", SvPV_nolen(key)); + hv_store_ent(hv, key, st->nfields == 1 ? &PL_sv_yes : fupg_st_getval(aTHX_ st, i, 1), 0); + } + return sv; +} + +static SV *fupg_st_kva(pTHX_ fupg_st *st) { + fupg_st_execute(aTHX_ st); + if (st->nfields == 0) fu_confess("Invalid use of $st->kva() on query returning no data"); + int i, j, nrows = PQntuples(st->result); + HV *hv = newHV(); + SV *sv = sv_2mortal(newRV_noinc((SV *)hv)); + for (i=0; ikva() query results", SvPV_nolen(key)); + AV *row = newAV_alloc_x(st->nfields); + hv_store_ent(hv, key, newRV_noinc((SV *)row), 0); + for (j=1; jnfields; j++) + av_push_simple(row, fupg_st_getval(aTHX_ st, i, j)); + } + return sv; +} + +static SV *fupg_st_kvh(pTHX_ fupg_st *st) { + fupg_st_execute(aTHX_ st); + fupg_st_check_dupcols(aTHX_ st, 1); + if (st->nfields == 0) fu_confess("Invalid use of $st->kvh() on query returning no data"); + int i, j, nrows = PQntuples(st->result); + HV *hv = newHV(); + SV *sv = sv_2mortal(newRV_noinc((SV *)hv)); + for (i=0; ikvh() query results", SvPV_nolen(key)); + HV *row = newHV(); + hv_store_ent(hv, key, newRV_noinc((SV *)row), 0); + for (j=1; jnfields; j++) { + const char *key = PQfname(st->result, j); + hv_store(row, key, -strlen(key), fupg_st_getval(aTHX_ st, i, j), 0); + } + } + return sv; +} diff --git a/t/pgconnect.t b/t/pgconnect.t index 351f56c..f620095 100644 --- a/t/pgconnect.t +++ b/t/pgconnect.t @@ -144,6 +144,74 @@ subtest '$st->rowh', sub { is_deeply $conn->q('SELECT 1 as a, $1::int as b', undef)->rowh, {a => 1, b => undef}; }; +subtest '$st->alla', sub { + is_deeply $conn->q('SELECT 1 WHERE false')->alla, []; + is_deeply $conn->q('SELECT')->alla, [[]]; + is_deeply $conn->q('SELECT 1')->alla, [[1]]; + is_deeply $conn->q('SELECT 1, null UNION ALL SELECT NULL, 2')->alla, [[1,undef],[undef,2]]; +}; + +subtest '$st->allh', sub { + ok !eval { $conn->q('SELECT 1 as a, 2 as a')->allh; 1 }; + like $@, qr/Query returns multiple columns with the same name/; + + is_deeply $conn->q('SELECT 1 WHERE false')->allh, []; + is_deeply $conn->q('SELECT')->allh, [{}]; + is_deeply $conn->q('SELECT 1 a')->allh, [{a=>1}]; + is_deeply $conn->q('SELECT 1 a, null b UNION ALL SELECT NULL, 2')->allh, [{a=>1,b=>undef},{a=>undef,b=>2}]; +}; + +subtest '$st->flat', sub { + is_deeply $conn->q('SELECT 1 WHERE false')->flat, []; + is_deeply $conn->q('SELECT')->flat, []; + is_deeply $conn->q('SELECT 1')->flat, [1]; + is_deeply $conn->q('SELECT 1, null UNION ALL SELECT NULL, 2')->flat, [1,undef,undef,2]; +}; + +subtest '$st->kvv', sub { + ok !eval { $conn->q('SELECT')->kvv; 1; }; + like $@, qr/returning no data/; + + ok !eval { $conn->q('SELECT 1, 2, 3')->kvv; 1; }; + like $@, qr/returning more than two columns/; + + ok !eval { $conn->q('SELECT 1 UNION ALL SELECT 1')->kvv; 1; }; + like $@, qr/is duplicated/; + + is_deeply $conn->q('SELECT 1 WHERE false')->kvv, {}; + is_deeply $conn->q('SELECT 1')->kvv, {1=>1}; + is_deeply $conn->q('SELECT 1, null UNION ALL SELECT 3, 2')->kvv, {1=>undef,3=>2}; +}; + +subtest '$st->kva', sub { + ok !eval { $conn->q('SELECT')->kva; 1; }; + like $@, qr/returning no data/; + + ok !eval { $conn->q('SELECT 1 UNION ALL SELECT 1')->kva; 1; }; + like $@, qr/is duplicated/; + + is_deeply $conn->q('SELECT 1 WHERE false')->kva, {}; + is_deeply $conn->q('SELECT 1')->kva, {1=>[]}; + is_deeply $conn->q("SELECT 1, null, 'hi' UNION ALL SELECT 3, 2, 'ok'")->kva, + {1=>[undef,'hi'], 3=>[2, 'ok']}; +}; + +subtest '$st->kvh', sub { + ok !eval { $conn->q('SELECT')->kvh; 1; }; + like $@, qr/returning no data/; + + ok !eval { $conn->q('SELECT 1 UNION ALL SELECT 1')->kvh; 1; }; + like $@, qr/is duplicated/; + + ok !eval { $conn->q('SELECT 1, 2, 3')->kvh; 1; }; + like $@, qr/Query returns multiple columns with the same name/; + + is_deeply $conn->q('SELECT 1 WHERE false')->kvh, {}; + is_deeply $conn->q('SELECT 1')->kvh, {1=>{}}; + is_deeply $conn->q("SELECT 1 as a , null as a, 'hi' as b UNION ALL SELECT 3, 2, 'ok'")->kvh, + {1=>{a=>undef,b=>'hi'}, 3=>{a=>2,b=>'ok'}}; +}; + subtest 'txn', sub { $conn->exec('CREATE TEMPORARY TABLE fupg_tst (id int)'); $conn->txn->exec('INSERT INTO fupg_tst VALUES (1)'); # rolled back