From 1f7e2de9a0f731d11526299586a473f89787a09d Mon Sep 17 00:00:00 2001 From: Yorhel Date: Wed, 12 Feb 2025 17:08:22 +0100 Subject: [PATCH] pg: Add prepared statement caching The tests are not as thourough as I would like. There's many ways to mess this up. I was initially planning to drop the ref on the prepared statement immediately after executing the query, so that the $st object can be kept around for introspection without consuming excess resources. Unfortunately, PQcopyResult does not copy over information about bind parameters, so we need another way to keep that information alive. I ended up going for the simple solution: keep the ref on the prepared statement... --- FU.xs | 6 +++ c/pgconn.c | 105 +++++++++++++++++++++++++++++++++++++++++++++++++- c/pgst.c | 31 +++++++++++---- t/pgconnect.t | 58 +++++++++++++++++++++++++++- 4 files changed, 190 insertions(+), 10 deletions(-) diff --git a/FU.xs b/FU.xs index 545e4c6..95a42b5 100644 --- a/FU.xs +++ b/FU.xs @@ -116,6 +116,12 @@ void cache(fupg_conn *x, ...) CODE: FUPG_STFLAGS; +void cache_size(fupg_conn *c, unsigned int n) + CODE: + c->prep_max = n; + fupg_prepared_prune(c); + XSRETURN(1); + void disconnect(fupg_conn *c) CODE: fupg_conn_disconnect(c); diff --git a/c/pgconn.c b/c/pgconn.c index 469dfbf..42f9572 100644 --- a/c/pgconn.c +++ b/c/pgconn.c @@ -5,6 +5,27 @@ KHASHL_MAP_INIT(KH_LOCAL, fupg_records, fupg_records, Oid, fupg_record *, kh_hash_uint32, kh_eq_generic); +typedef struct fupg_prep fupg_prep; +struct fupg_prep { + khint_t hash; /* Cached kh_hash_str() of the query */ + int ref; /* How many active $st objects are using this */ + UV name; + fupg_prep *next, *prev; /* FIFO list for the LRU, only if ref=0 */ + char *query; + PGresult *describe; +}; + +#define fupg_prep_hash(p) ((p)->hash) +#define fupg_prep_eq(a, b) (strcmp((a)->query, (b)->query) == 0) +KHASHL_SET_INIT(KH_LOCAL, fupg_prepared, fupg_prepared, fupg_prep *, fupg_prep_hash, fupg_prep_eq); + +static void fupg_prep_destroy(fupg_prep *p) { + PQclear(p->describe); + safefree(p->query); + safefree(p); +} + + typedef struct { SV *self; PGconn *conn; @@ -13,8 +34,12 @@ typedef struct { UV cookie; /* currently active transaction object; 0 = none active */ int stflags; int ntypes; + unsigned int prep_max; + unsigned int prep_cur; /* Number of prepared statements not associated with an active $st object */ fupg_type *types; fupg_records *records; + fupg_prepared *prep_map; + fupg_prep *prep_head, *prep_tail; /* Inserted into head, removed at tail */ fustr buf; /* Scratch space for query params */ } fupg_conn; @@ -135,10 +160,14 @@ static SV *fupg_connect(pTHX_ const char *str) { fupg_conn *c = safemalloc(sizeof(fupg_conn)); c->conn = conn; c->prep_counter = c->cookie_counter = c->cookie = 0; - c->stflags = 0; + c->stflags = FUPG_CACHE; c->ntypes = 0; c->types = NULL; c->records = fupg_records_init(); + c->prep_cur = 0; + c->prep_max = 256; + c->prep_map = fupg_prepared_init(); + c->prep_head = c->prep_tail = NULL; fustr_init(&c->buf, NULL, SIZE_MAX); return fu_selfobj(c, "FU::Pg::conn"); } @@ -157,6 +186,8 @@ static const char *fupg_conn_status(fupg_conn *c) { static void fupg_conn_disconnect(fupg_conn *c) { PQfinish(c->conn); c->conn = NULL; + /* We don't have an API to reconnect with the same $conn object, so no need + * to clean up the prepared statement cache at this point. */ } static void fupg_conn_destroy(fupg_conn *c) { @@ -166,6 +197,8 @@ static void fupg_conn_destroy(fupg_conn *c) { khint_t k; kh_foreach(c->records, k) safefree(kh_val(c->records, k)); fupg_records_destroy(c->records); + kh_foreach(c->prep_map, k) fupg_prep_destroy(kh_key(c->prep_map, k)); + fupg_prepared_destroy(c->prep_map); safefree(c); } @@ -245,6 +278,76 @@ static void fupg_txn_destroy(pTHX_ fupg_txn *t) { +/* Prepared statement caching */ + +static void fupg_prepared_list_remove(fupg_conn *c, fupg_prep *p) { + if (p->next) p->next->prev = p->prev; + if (p->prev) p->prev->next = p->next; + if (c->prep_head == p) c->prep_head = p->next; + if (c->prep_tail == p) c->prep_tail = p->prev; + c->prep_cur--; +} + +static void fupg_prepared_list_unshift(fupg_conn *c, fupg_prep *p) { + p->next = c->prep_head; + p->prev = NULL; + c->prep_head = p; + if (p->next) p->next->prev = p; + else c->prep_tail = p; + c->prep_cur++; +} + +static void fupg_prepared_prune(fupg_conn *c) { + while (c->prep_cur > c->prep_max) { + fupg_prep *p = c->prep_tail; + fupg_prepared_list_remove(c, p); + assert(p->ref == 0); + + khint_t k = fupg_prepared_get(c->prep_map, p); + assert(k != kh_end(c->prep_map)); + fupg_prepared_del(c->prep_map, k); + + char name[64]; + snprintf(name, sizeof(name), "fupg%"UVuf, p->name); + PQclear(PQclosePrepared(c->conn, name)); + fupg_prep_destroy(p); + } +} + +/* Fetch and ref a prepared statement, returns a new object if nothing was cached */ +static fupg_prep *fupg_prepared_ref(fupg_conn *c, const char *query) { + fupg_prep prep; + prep.hash = kh_hash_str(query); + prep.query = (char *)query; + khint_t k = fupg_prepared_get(c->prep_map, &prep); + fupg_prep *p; + + if (k == kh_end(c->prep_map)) { + p = safecalloc(1, sizeof(*p)); + p->hash = prep.hash; + p->query = savepv(query); + p->ref = 1; + int i; + fupg_prepared_put(c->prep_map, p, &i); + + } else { + p = kh_key(c->prep_map, k); + if (!p->ref++) fupg_prepared_list_remove(c, p); + } + return p; +} + +static void fupg_prepared_unref(fupg_conn *c, fupg_prep *p) { + assert(p->ref > 0); + if (!--p->ref) { + fupg_prepared_list_unshift(c, p); + fupg_prepared_prune(c); + } +} + + + + /* Type handling */ /* XXX: It feels a bit wasteful to load *all* types; even on an empty database diff --git a/c/pgst.c b/c/pgst.c index 4fc099b..256a650 100644 --- a/c/pgst.c +++ b/c/pgst.c @@ -11,7 +11,8 @@ typedef struct { /* Set during prepare */ int prepared; char name[32]; - PGresult *describe; + fupg_prep *prep; + PGresult *describe; /* shared with prep->describe if prep is set */ /* Set during execute */ int nfields; @@ -50,8 +51,8 @@ static SV *fupg_q(pTHX_ fupg_conn *c, int stflags, const char *query, I32 ax, I3 st->bind = safemalloc((argc-2) * sizeof(SV *)); I32 i; for (i=2; i < argc; i++) { - SvGETMAGIC(ST(i)); - st->bind[st->nbind] = SvREFCNT_inc(ST(i)); + st->bind[st->nbind] = newSV(0); + sv_setsv(st->bind[st->nbind], ST(i)); st->nbind++; } } @@ -61,8 +62,13 @@ static SV *fupg_q(pTHX_ fupg_conn *c, int stflags, const char *query, I32 ax, I3 static void fupg_st_destroy(fupg_st *st) { int i; - /* Ignore failure, this is just a best-effort attempt to free up resources on the backend */ - if (st->prepared) PQclear(PQclosePrepared(st->conn->conn, st->name)); + + if (st->prep) { + fupg_prepared_unref(st->conn, st->prep); + } else if (st->prepared) { + PQclear(st->describe); + PQclear(PQclosePrepared(st->conn->conn, st->name)); + } safefree(st->query); for (i=0; i < st->nbind; i++) SvREFCNT_dec(st->bind[i]); @@ -73,7 +79,6 @@ static void fupg_st_destroy(fupg_st *st) { if (st->recv) for (i=0; infields; i++) fupg_tio_free(st->recv + i); fupg_tio_free(&st->send); safefree(st->recv); - PQclear(st->describe); PQclear(st->result); SvREFCNT_dec(st->conn->self); safefree(st); @@ -83,9 +88,18 @@ static void fupg_st_prepare(pTHX_ fupg_st *st) { if (st->describe) return; if (st->prepared) fu_confess("invalid attempt to re-prepare invalid statement"); - /* TODO: This is where we check for any cached prepared statements */ + if (st->stflags & FUPG_CACHE) + st->prep = fupg_prepared_ref(st->conn, st->query); + if (st->prep && st->prep->describe) { + snprintf(st->name, sizeof(st->name), "fupg%"UVuf, st->prep->name); + st->describe = st->prep->describe; + st->prepared = 1; + return; + } - snprintf(st->name, sizeof(st->name), "fupg%"UVuf, ++st->conn->prep_counter); + st->conn->prep_counter++; + if (st->prep) st->prep->name = st->conn->prep_counter; + snprintf(st->name, sizeof(st->name), "fupg%"UVuf, st->conn->prep_counter); /* Send prepare + describe in a pipeline to avoid a double round-trip with the server */ PQenterPipelineMode(st->conn->conn); @@ -116,6 +130,7 @@ static void fupg_st_prepare(pTHX_ fupg_st *st) { PQclear(sync); fupg_result_croak(desc, "prepare", st->query); } + if (st->prep) st->prep->describe = desc; st->describe = desc; if (!sync) fupg_conn_croak(st->conn , "prepare"); diff --git a/t/pgconnect.t b/t/pgconnect.t index 2903195..fe3f11b 100644 --- a/t/pgconnect.t +++ b/t/pgconnect.t @@ -17,7 +17,7 @@ okerr FATAL => connect => qr/missing "=" after "invalid"/; ok FU::Pg::lib_version() > 100000; -my $conn = FU::Pg->connect($ENV{FU_TEST_DB})->text; +my $conn = FU::Pg->connect($ENV{FU_TEST_DB})->text->cache(0); $conn->_debug_trace(0); is ref $conn, 'FU::Pg::conn'; @@ -325,6 +325,62 @@ subtest 'txn', sub { is $conn->q('SELECT count(*) FROM fupg_tst WHERE id = 3')->val, 0; }; +{ + local $_ = 'x'; + my $st = $conn->q('SELECT $1', $_); + $_ = 'y'; + is $st->val, 'x', 'shallow copy'; +} + +{ + my $a = [1,2]; + my $st = $conn->q('SELECT $1::int[]', $a)->text(0); + $a->[1] = 3; + is_deeply $st->val, [1,3], 'not deep copy'; +} + +subtest 'Prepared statement cache', sub { + my $txn = $conn->cache_size(2)->txn->cache; + my sub numexec($sql) { + $txn->q('SELECT generic_plans + custom_plans FROM pg_prepared_statements WHERE statement = $1', $sql)->cache(0)->val + } + is $txn->q('SELECT 1')->val, 1; + is numexec('SELECT 1'), 1; + + my $sql = 'SELECT $1::int as a, $2::text as b'; + ok !defined numexec($sql); + + my $params = $txn->q($sql)->param_types; + is_deeply $params, [23, 25]; + is numexec($sql), 0; + my $cparams = $txn->q($sql)->param_types; + is_deeply $cparams, $params; + + my $cols = $txn->q($sql)->columns; + is_deeply $cols, [{ name => 'a', oid => 23 }, { name => 'b', oid => 25 }]; + my $ccols = $txn->q($sql)->columns; + is_deeply $ccols, $cols; + + $txn->q($sql, 0, '')->exec; + is numexec($sql), 1; + $txn->q($sql, 0, '')->exec; + is numexec($sql), 2; + + is numexec('SELECT 1'), 1; + $txn->q('SELECT 2')->exec; + ok !defined numexec('SELECT 1'); + is numexec('SELECT 2'), 1; + + $conn->cache_size(1); + ok !defined numexec('SELECT 1'); + ok !defined numexec($sql); + is numexec('SELECT 2'), 1; + + $conn->cache_size(0); + ok !defined numexec($sql); + ok !defined numexec('SELECT 2'); +}; + { my $st = $conn->q("SELECT 1"); undef $conn; # statement keeps the connection alive