From d9d2ad0434d8272fe32893825dad2538ac41bf78 Mon Sep 17 00:00:00 2001 From: Yorhel Date: Mon, 10 Mar 2025 12:24:52 +0100 Subject: [PATCH] Pg: Add COPY support --- FU.xs | 38 +++++++++++++++++ FU/Pg.pm | 117 ++++++++++++++++++++++++++++++++++++++++------------- c/libpq.h | 6 ++- c/pgst.c | 79 ++++++++++++++++++++++++++++++++++++ t/pgcopy.t | 90 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 301 insertions(+), 29 deletions(-) create mode 100644 t/pgcopy.t diff --git a/FU.xs b/FU.xs index 3c6084a..1d63ffe 100644 --- a/FU.xs +++ b/FU.xs @@ -68,6 +68,7 @@ fuxmlwr * FUXMLWR fupg_conn * FUPG_CONN fupg_txn * FUPG_TXN fupg_st * FUPG_ST +fupg_copy * FUPG_COPY INPUT FUFCGI @@ -89,6 +90,10 @@ FUPG_TXN FUPG_ST if (sv_derived_from($arg, \"FU::Pg::st\")) $var = (fupg_st *)SvIVX(SvRV($arg)); else fu_confess(\"invalid statement object\"); + +FUPG_COPY + if (sv_derived_from($arg, \"FU::Pg::copy\")) $var = (fupg_copy *)SvIVX(SvRV($arg)); + else fu_confess(\"invalid COPY object\"); #" EOT @@ -233,6 +238,11 @@ void q(fupg_conn *c, SV *sv, ...) FUPG_CONN_COOKIE; ST(0) = fupg_q(aTHX_ c, c->stflags, SvPVutf8_nolen(sv), ax, items); +void copy(fupg_conn *c, SV *sv) + CODE: + FUPG_CONN_COOKIE; + ST(0) = fupg_copy_exec(aTHX_ c, SvPVutf8_nolen(sv)); + void _set_type(fupg_conn *c, SV *name, SV *sendsv, SV *recvsv) CODE: fupg_set_type(aTHX_ c, name, sendsv, recvsv); @@ -282,6 +292,12 @@ void q(fupg_txn *t, SV *sv, ...) FUPG_TXN_COOKIE; ST(0) = fupg_q(aTHX_ t->conn, t->stflags, SvPVutf8_nolen(sv), ax, items); +# XXX: The copy object should probably keep a ref on the transaction +void copy(fupg_txn *t, SV *sv) + CODE: + FUPG_TXN_COOKIE; + ST(0) = fupg_copy_exec(aTHX_ t->conn, SvPVutf8_nolen(sv)); + MODULE = FU PACKAGE = FU::Pg::st @@ -393,6 +409,28 @@ void DESTROY(fupg_st *st) fupg_st_destroy(aTHX_ st); +MODULE = FU PACKAGE = FU::Pg::copy + +void write(fupg_copy *c, SV *sv) + CODE: + fupg_copy_write(aTHX_ c, sv); + +void read(fupg_copy *c) + CODE: + ST(0) = fupg_copy_read(aTHX_ c, 0); + +void is_binary(fupg_copy *c) + CODE: + ST(0) = c->bin ? &PL_sv_yes : &PL_sv_no; + +void close(fupg_copy *c) + CODE: + fupg_copy_close(aTHX_ c, 0); + +void DESTROY(fupg_copy *c) + CODE: + fupg_copy_destroy(aTHX_ c); + MODULE = FU PACKAGE = FU::XMLWriter diff --git a/FU/Pg.pm b/FU/Pg.pm index 176a8ac..4bd4b23 100644 --- a/FU/Pg.pm +++ b/FU/Pg.pm @@ -112,6 +112,11 @@ Inside a transaction that is in an error state. The transaction must be rolled back in order to recover to a usable state. This happens automatically when the transaction object goes out of scope. +=item active + +Currently executing a query. This state can only be observed during a L. + =item bad Connection is dead or otherwise unusable. @@ -155,10 +160,11 @@ executing the query, but I the query results have been returned. The subroutine is (currently) only called for queries executed through C<< $conn->exec >>, C<< $conn->q >>, C<< $conn->Q >> and their C<$txn> variants; -internal queries performed by this module (such as for transaction management, -querying type information, etc) do not trigger the callback. Statements that -result in an error being thrown during or before execution are also not -traceable this way. This behavior might change in the future. +C<< $conn->copy >> statements and internal queries performed by this module +(such as for transaction management, querying type information, etc) do not +trigger the callback. Statements that result in an error being thrown during or +before execution are also not traceable this way. This behavior might change in +the future. =item $conn->disconnect @@ -519,6 +525,11 @@ current implementation does not track subtransactions that closely) A subtransaction is in error state and awaiting to be rolled back. +=item active + +Currently executing a query. This state can only be observed during a L. + =item bad Connection is dead or otherwise unusable. @@ -740,6 +751,71 @@ I Methods to convert between the various formats. I Methods to query type info. + +=head2 COPY support + +You can use L for efficient +bulk data transfers between your application and the PostgreSQL server: + +=over + +=item $copy = $conn->copy($statement) + +=item $copy = $txn->copy($statement) + +Execute C<$statement> and return a C object that lets you +transfer data to or from Postgres. + +It is not possible to execute any other queries on the same connection while a +copy operation is in progress. When used on a transaction object, C<$txn> must +be kept alive long enough to finish the copy operation. + +=back + +A C<$copy> object supports the following methods: + +=over + +=item $copy->is_binary + +Returns true if the transfer is performed in the binary format, false for text. + +=item $copy->write($data) + +Send C<$data> to the server. An error is thrown if this is not a C operation. An error may be thrown if C<$data> is not a valid format +understood by Postgres, but such errors can also be deferred to C. + +C<$data> is interpreted as a Perl Unicode string for textual transfers and as a +binary string for binary transfers. + +=item $copy->read + +Return the next row read from the Postgres server, or C if no more data +is coming. In the text format, a single line - including trailing newline - is +returned as a Perl Unicode string. In the binary format, a single row is +returned as a byte string. An error is thrown if this is not a C operation. + +=item $copy->close + +Marks the end of the copy operation. Does not return anything but throws an +error if something went wrong. + +It is possible to close a read-copy operation before all data has been +consumed, but that causes all data to still be read and discarded during +C. If you really want to interrupt a large read operation, a more +efficient approach is to call C<< $conn->close >> and discard the entire +connection. + +It is not I to call this method, simply letting the C<$copy> object +run out of scope will do the trick as well, but in that case errors are +silently discarded. An explicit C is recommended to catch errors. + +=back + + =head2 Errors All methods can throw an exception on error. When possible, the error message @@ -823,32 +899,17 @@ to it after C is always safe: =item * Only works with blocking (synchronous) calls, not very suitable for use in asynchronous frameworks unless you know your queries are fast and you have a -low-latency connection with the Postgres server. +low-latency connection with the Postgres server. This is unlikely to improve in +future versions, Perl's async story is somewhat awkward in general, and fully +supporting async operation might require a fundamental redesign of how this +module works. -=back +=item * LISTEN support is still missing. May be added in a future version, as +this seems doable without supporting full async. -Missing features: - -=over - -=item COPY support - -I hope to implement this someday. - -=item LISTEN support - -Would be nice to have, most likely doable without going full async. - -=item Asynchronous calls - -Probably won't happen. Perl's async story is slightly awkward in general, and -fully supporting async operation might require a fundamental redesign of how -this module works. It certainly won't I the implementation. - -=item Pipelining - -I have some ideas for an API, but doubt I'll ever implement it. Suffers from -the same awkwardness and complexity as asynchronous calls. +=item * Pipelining support is also missing. I have some ideas for an API, but +doubt I'll ever implement it. Suffers from the same awkwardness and complexity +as asynchronous calls. =back diff --git a/c/libpq.h b/c/libpq.h index f931016..94d817e 100644 --- a/c/libpq.h +++ b/c/libpq.h @@ -36,6 +36,7 @@ typedef enum { PQTRANS_IDLE, PQTRANS_ACTIVE, PQTRANS_INTRANS, PQTRANS_INERROR, P #define PG_DIAG_SOURCE_FUNCTION 'R' #define PG_FUNCS \ + X(PQbinaryTuples, int, const PGresult *) \ X(PQclear, void, PGresult *) \ X(PQclosePrepared, PGresult *, PGconn *, const char *) \ X(PQcmdTuples, char *, PGresult *) \ @@ -51,9 +52,10 @@ typedef enum { PQTRANS_IDLE, PQTRANS_ACTIVE, PQTRANS_INTRANS, PQTRANS_INERROR, P X(PQfname, char *, const PGresult *, int) \ X(PQfreemem, void, void *) \ X(PQftype, Oid, const PGresult *, int) \ + X(PQgetCopyData, int, PGconn *, char **, int) \ + X(PQgetResult, PGresult *, PGconn *) \ X(PQgetisnull, int, const PGresult *, int, int) \ X(PQgetlength, int, const PGresult *, int, int) \ - X(PQgetResult, PGresult *, PGconn *) \ X(PQgetvalue, char *, const PGresult *, int, int) \ X(PQlibVersion, int, void) \ X(PQnfields, int, const PGresult *) \ @@ -61,6 +63,8 @@ typedef enum { PQTRANS_IDLE, PQTRANS_ACTIVE, PQTRANS_INTRANS, PQTRANS_INERROR, P X(PQntuples, int, const PGresult *) \ X(PQparamtype, Oid, const PGresult *, int) \ X(PQpipelineSync, int, PGconn *) \ + X(PQputCopyData, int, PGconn *, const char *, int) \ + X(PQputCopyEnd, int, PGconn *, const char *) \ X(PQresStatus, char *, ExecStatusType) \ X(PQresultErrorField, char *, const PGresult *, int) \ X(PQresultErrorMessage, char *, const PGresult *) \ diff --git a/c/pgst.c b/c/pgst.c index 94c8072..1e01392 100644 --- a/c/pgst.c +++ b/c/pgst.c @@ -503,3 +503,82 @@ static SV *fupg_st_kvh(pTHX_ fupg_st *st) { } return sv; } + + + + +/* COPY support */ + +typedef struct { + SV *self; + fupg_conn *conn; + char in; + char bin; + char rddone; + char closed; +} fupg_copy; + +static SV *fupg_copy_exec(pTHX_ fupg_conn *c, const char *sql) { + PGresult *r = PQexec(c->conn, sql); + + if (!r) fupg_conn_croak(c, "exec"); + int s = PQresultStatus(r); + switch (s) { + case PGRES_COPY_OUT: + case PGRES_COPY_IN: + break; + default: fupg_result_croak(r, "exec", sql); + } + + fupg_copy *copy = safecalloc(1, sizeof(fupg_copy)); + copy->conn = c; + SvREFCNT_inc(c->self); + copy->bin = !!PQbinaryTuples(r); + copy->in = s == PGRES_COPY_IN; + PQclear(r); + return fu_selfobj(copy, "FU::Pg::copy"); +} + +static void fupg_copy_write(pTHX_ fupg_copy *c, SV *data) { + STRLEN len; + const char *buf = c->bin ? SvPVbyte(data, len) : SvPVutf8(data, len); + if (PQputCopyData(c->conn->conn, buf, len) < 0) fupg_conn_croak(c->conn, "copy"); +} + +static SV *fupg_copy_read(pTHX_ fupg_copy *c, int discard) { + char *buf = NULL; + int len = PQgetCopyData(c->conn->conn, &buf, 0); + if (len == -1) { + c->rddone = 1; + return &PL_sv_undef; + } else if (len < 0) { + if (discard) c->rddone = 1; + else fupg_conn_croak(c->conn, "copy"); + } + SV *r = discard ? &PL_sv_undef : newSVpvn_flags(buf, len, SVs_TEMP | (c->bin ? 0 : SVf_UTF8)); + PQfreemem(buf); + return r; +} + +static void fupg_copy_close(pTHX_ fupg_copy *c, int ignerror) { + if (c->closed) return; + c->closed = 1; /* Mark as closed even on error, a second attempt won't help anyway */ + + if (c->in && PQputCopyEnd(c->conn->conn, NULL) < 0 && !ignerror) + fupg_conn_croak(c->conn, "copyEnd"); + + while (!c->in && !c->rddone) fupg_copy_read(aTHX_ c, 1); + + PGresult *r = PQgetResult(c->conn->conn); + if (!ignerror && !r) fupg_conn_croak(c->conn, "copyEnd"); + if (!ignerror && PQresultStatus(r) != PGRES_COMMAND_OK) fupg_result_croak(r, "copy", ""); + PQclear(r); + + while ((r = PQgetResult(c->conn->conn))) PQclear(r); +} + +static void fupg_copy_destroy(pTHX_ fupg_copy *c) { + fupg_copy_close(aTHX_ c, 1); + SvREFCNT_dec(c->conn->self); + safefree(c); +} diff --git a/t/pgcopy.t b/t/pgcopy.t new file mode 100644 index 0000000..9c81349 --- /dev/null +++ b/t/pgcopy.t @@ -0,0 +1,90 @@ +use v5.36; +use Test::More; + +plan skip_all => $@ if !eval { require FU::Pg; } && $@ =~ /Unable to load libpq/; +die $@ if $@; +plan skip_all => 'Please set FU_TEST_DB to a PostgreSQL connection string to run these tests' if !$ENV{FU_TEST_DB}; + +my $conn = FU::Pg->connect($ENV{FU_TEST_DB}); +$conn->_debug_trace(0); + +ok !eval { $conn->copy('SELECT 1') }; +like $@, qr/unexpected status code/; + +ok !eval { $conn->copy('COPX') }; +like $@, qr/syntax error/; + +$conn->exec('CREATE TEMPORARY TABLE fupg_copy_test (v int)'); + +is $conn->status, 'idle'; +{ + my $c = $conn->copy('COPY (SELECT 1) TO STDOUT'); + is $conn->status, 'active'; + $c->close; +} +is $conn->status, 'idle'; +$conn->copy('COPY (SELECT 1) TO STDOUT'); +is $conn->status, 'idle'; + +{ + my $c = $conn->copy('COPY fupg_copy_test FROM STDIN'); + is $conn->status, 'active'; + $c->close; +} +is $conn->status, 'idle'; +$conn->copy('COPY fupg_copy_test FROM STDIN'); +is $conn->status, 'idle'; + +{ + my $c = $conn->copy('COPY fupg_copy_test FROM STDIN'); + ok !$c->is_binary; + ok !eval { $c->{read} }; + $c->write("1"); + $c->write("\n2\n3\n"); + $c->close; + ok !eval { $c->read }; + ok !eval { $c->write('') }; + $c->close; +} +is $conn->status, 'idle'; + +{ + my $c = $conn->copy('COPY (SELECT * FROM fupg_copy_test ORDER BY v) TO STDOUT'); + ok !$c->is_binary; + ok !eval { $c->write('') }; + is $c->read, "1\n"; + is $c->read, "2\n"; + is $c->read, "3\n"; + is $c->read, undef; + $c->close; + ok !eval { $c->read }; + ok !eval { $c->write('') }; + $c->close; +} +is $conn->status, 'idle'; + +my $bin = ''; +{ + my $c = $conn->copy('COPY fupg_copy_test TO STDOUT (FORMAT binary)'); + ok $c->is_binary; + while (my $d = $c->read) { + $bin .= $d; + } + $c->close; +} +is $conn->status, 'idle'; + +{ + my $txn = $conn->txn; + my $c = $txn->copy('COPY fupg_copy_test FROM STDIN (FORMAT binary)'); + is $txn->status, 'active'; + ok $c->is_binary; + $c->write($bin); + $c->close; + + is $txn->q('SELECT sum(v) FROM fupg_copy_test')->val, 1+1+2+2+3+3; + $txn->rollback; +} +is $conn->q('SELECT sum(v) FROM fupg_copy_test')->val, 1+2+3; + +done_testing;