diff --git a/FU.xs b/FU.xs index 545e4c6..95a42b5 100644 --- a/FU.xs +++ b/FU.xs @@ -116,6 +116,12 @@ void cache(fupg_conn *x, ...) CODE: FUPG_STFLAGS; +void cache_size(fupg_conn *c, unsigned int n) + CODE: + c->prep_max = n; + fupg_prepared_prune(c); + XSRETURN(1); + void disconnect(fupg_conn *c) CODE: fupg_conn_disconnect(c); diff --git a/c/pgconn.c b/c/pgconn.c index 469dfbf..42f9572 100644 --- a/c/pgconn.c +++ b/c/pgconn.c @@ -5,6 +5,27 @@ KHASHL_MAP_INIT(KH_LOCAL, fupg_records, fupg_records, Oid, fupg_record *, kh_hash_uint32, kh_eq_generic); +typedef struct fupg_prep fupg_prep; +struct fupg_prep { + khint_t hash; /* Cached kh_hash_str() of the query */ + int ref; /* How many active $st objects are using this */ + UV name; + fupg_prep *next, *prev; /* FIFO list for the LRU, only if ref=0 */ + char *query; + PGresult *describe; +}; + +#define fupg_prep_hash(p) ((p)->hash) +#define fupg_prep_eq(a, b) (strcmp((a)->query, (b)->query) == 0) +KHASHL_SET_INIT(KH_LOCAL, fupg_prepared, fupg_prepared, fupg_prep *, fupg_prep_hash, fupg_prep_eq); + +static void fupg_prep_destroy(fupg_prep *p) { + PQclear(p->describe); + safefree(p->query); + safefree(p); +} + + typedef struct { SV *self; PGconn *conn; @@ -13,8 +34,12 @@ typedef struct { UV cookie; /* currently active transaction object; 0 = none active */ int stflags; int ntypes; + unsigned int prep_max; + unsigned int prep_cur; /* Number of prepared statements not associated with an active $st object */ fupg_type *types; fupg_records *records; + fupg_prepared *prep_map; + fupg_prep *prep_head, *prep_tail; /* Inserted into head, removed at tail */ fustr buf; /* Scratch space for query params */ } fupg_conn; @@ -135,10 +160,14 @@ static SV *fupg_connect(pTHX_ const char *str) { fupg_conn *c = safemalloc(sizeof(fupg_conn)); c->conn = conn; c->prep_counter = c->cookie_counter = c->cookie = 0; - c->stflags = 0; + c->stflags = FUPG_CACHE; c->ntypes = 0; c->types = NULL; c->records = fupg_records_init(); + c->prep_cur = 0; + c->prep_max = 256; + c->prep_map = fupg_prepared_init(); + c->prep_head = c->prep_tail = NULL; fustr_init(&c->buf, NULL, SIZE_MAX); return fu_selfobj(c, "FU::Pg::conn"); } @@ -157,6 +186,8 @@ static const char *fupg_conn_status(fupg_conn *c) { static void fupg_conn_disconnect(fupg_conn *c) { PQfinish(c->conn); c->conn = NULL; + /* We don't have an API to reconnect with the same $conn object, so no need + * to clean up the prepared statement cache at this point. */ } static void fupg_conn_destroy(fupg_conn *c) { @@ -166,6 +197,8 @@ static void fupg_conn_destroy(fupg_conn *c) { khint_t k; kh_foreach(c->records, k) safefree(kh_val(c->records, k)); fupg_records_destroy(c->records); + kh_foreach(c->prep_map, k) fupg_prep_destroy(kh_key(c->prep_map, k)); + fupg_prepared_destroy(c->prep_map); safefree(c); } @@ -245,6 +278,76 @@ static void fupg_txn_destroy(pTHX_ fupg_txn *t) { +/* Prepared statement caching */ + +static void fupg_prepared_list_remove(fupg_conn *c, fupg_prep *p) { + if (p->next) p->next->prev = p->prev; + if (p->prev) p->prev->next = p->next; + if (c->prep_head == p) c->prep_head = p->next; + if (c->prep_tail == p) c->prep_tail = p->prev; + c->prep_cur--; +} + +static void fupg_prepared_list_unshift(fupg_conn *c, fupg_prep *p) { + p->next = c->prep_head; + p->prev = NULL; + c->prep_head = p; + if (p->next) p->next->prev = p; + else c->prep_tail = p; + c->prep_cur++; +} + +static void fupg_prepared_prune(fupg_conn *c) { + while (c->prep_cur > c->prep_max) { + fupg_prep *p = c->prep_tail; + fupg_prepared_list_remove(c, p); + assert(p->ref == 0); + + khint_t k = fupg_prepared_get(c->prep_map, p); + assert(k != kh_end(c->prep_map)); + fupg_prepared_del(c->prep_map, k); + + char name[64]; + snprintf(name, sizeof(name), "fupg%"UVuf, p->name); + PQclear(PQclosePrepared(c->conn, name)); + fupg_prep_destroy(p); + } +} + +/* Fetch and ref a prepared statement, returns a new object if nothing was cached */ +static fupg_prep *fupg_prepared_ref(fupg_conn *c, const char *query) { + fupg_prep prep; + prep.hash = kh_hash_str(query); + prep.query = (char *)query; + khint_t k = fupg_prepared_get(c->prep_map, &prep); + fupg_prep *p; + + if (k == kh_end(c->prep_map)) { + p = safecalloc(1, sizeof(*p)); + p->hash = prep.hash; + p->query = savepv(query); + p->ref = 1; + int i; + fupg_prepared_put(c->prep_map, p, &i); + + } else { + p = kh_key(c->prep_map, k); + if (!p->ref++) fupg_prepared_list_remove(c, p); + } + return p; +} + +static void fupg_prepared_unref(fupg_conn *c, fupg_prep *p) { + assert(p->ref > 0); + if (!--p->ref) { + fupg_prepared_list_unshift(c, p); + fupg_prepared_prune(c); + } +} + + + + /* Type handling */ /* XXX: It feels a bit wasteful to load *all* types; even on an empty database diff --git a/c/pgst.c b/c/pgst.c index 4fc099b..256a650 100644 --- a/c/pgst.c +++ b/c/pgst.c @@ -11,7 +11,8 @@ typedef struct { /* Set during prepare */ int prepared; char name[32]; - PGresult *describe; + fupg_prep *prep; + PGresult *describe; /* shared with prep->describe if prep is set */ /* Set during execute */ int nfields; @@ -50,8 +51,8 @@ static SV *fupg_q(pTHX_ fupg_conn *c, int stflags, const char *query, I32 ax, I3 st->bind = safemalloc((argc-2) * sizeof(SV *)); I32 i; for (i=2; i < argc; i++) { - SvGETMAGIC(ST(i)); - st->bind[st->nbind] = SvREFCNT_inc(ST(i)); + st->bind[st->nbind] = newSV(0); + sv_setsv(st->bind[st->nbind], ST(i)); st->nbind++; } } @@ -61,8 +62,13 @@ static SV *fupg_q(pTHX_ fupg_conn *c, int stflags, const char *query, I32 ax, I3 static void fupg_st_destroy(fupg_st *st) { int i; - /* Ignore failure, this is just a best-effort attempt to free up resources on the backend */ - if (st->prepared) PQclear(PQclosePrepared(st->conn->conn, st->name)); + + if (st->prep) { + fupg_prepared_unref(st->conn, st->prep); + } else if (st->prepared) { + PQclear(st->describe); + PQclear(PQclosePrepared(st->conn->conn, st->name)); + } safefree(st->query); for (i=0; i < st->nbind; i++) SvREFCNT_dec(st->bind[i]); @@ -73,7 +79,6 @@ static void fupg_st_destroy(fupg_st *st) { if (st->recv) for (i=0; infields; i++) fupg_tio_free(st->recv + i); fupg_tio_free(&st->send); safefree(st->recv); - PQclear(st->describe); PQclear(st->result); SvREFCNT_dec(st->conn->self); safefree(st); @@ -83,9 +88,18 @@ static void fupg_st_prepare(pTHX_ fupg_st *st) { if (st->describe) return; if (st->prepared) fu_confess("invalid attempt to re-prepare invalid statement"); - /* TODO: This is where we check for any cached prepared statements */ + if (st->stflags & FUPG_CACHE) + st->prep = fupg_prepared_ref(st->conn, st->query); + if (st->prep && st->prep->describe) { + snprintf(st->name, sizeof(st->name), "fupg%"UVuf, st->prep->name); + st->describe = st->prep->describe; + st->prepared = 1; + return; + } - snprintf(st->name, sizeof(st->name), "fupg%"UVuf, ++st->conn->prep_counter); + st->conn->prep_counter++; + if (st->prep) st->prep->name = st->conn->prep_counter; + snprintf(st->name, sizeof(st->name), "fupg%"UVuf, st->conn->prep_counter); /* Send prepare + describe in a pipeline to avoid a double round-trip with the server */ PQenterPipelineMode(st->conn->conn); @@ -116,6 +130,7 @@ static void fupg_st_prepare(pTHX_ fupg_st *st) { PQclear(sync); fupg_result_croak(desc, "prepare", st->query); } + if (st->prep) st->prep->describe = desc; st->describe = desc; if (!sync) fupg_conn_croak(st->conn , "prepare"); diff --git a/t/pgconnect.t b/t/pgconnect.t index 2903195..fe3f11b 100644 --- a/t/pgconnect.t +++ b/t/pgconnect.t @@ -17,7 +17,7 @@ okerr FATAL => connect => qr/missing "=" after "invalid"/; ok FU::Pg::lib_version() > 100000; -my $conn = FU::Pg->connect($ENV{FU_TEST_DB})->text; +my $conn = FU::Pg->connect($ENV{FU_TEST_DB})->text->cache(0); $conn->_debug_trace(0); is ref $conn, 'FU::Pg::conn'; @@ -325,6 +325,62 @@ subtest 'txn', sub { is $conn->q('SELECT count(*) FROM fupg_tst WHERE id = 3')->val, 0; }; +{ + local $_ = 'x'; + my $st = $conn->q('SELECT $1', $_); + $_ = 'y'; + is $st->val, 'x', 'shallow copy'; +} + +{ + my $a = [1,2]; + my $st = $conn->q('SELECT $1::int[]', $a)->text(0); + $a->[1] = 3; + is_deeply $st->val, [1,3], 'not deep copy'; +} + +subtest 'Prepared statement cache', sub { + my $txn = $conn->cache_size(2)->txn->cache; + my sub numexec($sql) { + $txn->q('SELECT generic_plans + custom_plans FROM pg_prepared_statements WHERE statement = $1', $sql)->cache(0)->val + } + is $txn->q('SELECT 1')->val, 1; + is numexec('SELECT 1'), 1; + + my $sql = 'SELECT $1::int as a, $2::text as b'; + ok !defined numexec($sql); + + my $params = $txn->q($sql)->param_types; + is_deeply $params, [23, 25]; + is numexec($sql), 0; + my $cparams = $txn->q($sql)->param_types; + is_deeply $cparams, $params; + + my $cols = $txn->q($sql)->columns; + is_deeply $cols, [{ name => 'a', oid => 23 }, { name => 'b', oid => 25 }]; + my $ccols = $txn->q($sql)->columns; + is_deeply $ccols, $cols; + + $txn->q($sql, 0, '')->exec; + is numexec($sql), 1; + $txn->q($sql, 0, '')->exec; + is numexec($sql), 2; + + is numexec('SELECT 1'), 1; + $txn->q('SELECT 2')->exec; + ok !defined numexec('SELECT 1'); + is numexec('SELECT 2'), 1; + + $conn->cache_size(1); + ok !defined numexec('SELECT 1'); + ok !defined numexec($sql); + is numexec('SELECT 2'), 1; + + $conn->cache_size(0); + ok !defined numexec($sql); + ok !defined numexec('SELECT 2'); +}; + { my $st = $conn->q("SELECT 1"); undef $conn; # statement keeps the connection alive