Pg: Add COPY support

This commit is contained in:
Yorhel 2025-03-10 12:24:52 +01:00
parent dc752e2a23
commit d9d2ad0434
5 changed files with 301 additions and 29 deletions

38
FU.xs
View file

@ -68,6 +68,7 @@ fuxmlwr * FUXMLWR
fupg_conn * FUPG_CONN
fupg_txn * FUPG_TXN
fupg_st * FUPG_ST
fupg_copy * FUPG_COPY
INPUT
FUFCGI
@ -89,6 +90,10 @@ FUPG_TXN
FUPG_ST
if (sv_derived_from($arg, \"FU::Pg::st\")) $var = (fupg_st *)SvIVX(SvRV($arg));
else fu_confess(\"invalid statement object\");
FUPG_COPY
if (sv_derived_from($arg, \"FU::Pg::copy\")) $var = (fupg_copy *)SvIVX(SvRV($arg));
else fu_confess(\"invalid COPY object\");
#"
EOT
@ -233,6 +238,11 @@ void q(fupg_conn *c, SV *sv, ...)
FUPG_CONN_COOKIE;
ST(0) = fupg_q(aTHX_ c, c->stflags, SvPVutf8_nolen(sv), ax, items);
void copy(fupg_conn *c, SV *sv)
CODE:
FUPG_CONN_COOKIE;
ST(0) = fupg_copy_exec(aTHX_ c, SvPVutf8_nolen(sv));
void _set_type(fupg_conn *c, SV *name, SV *sendsv, SV *recvsv)
CODE:
fupg_set_type(aTHX_ c, name, sendsv, recvsv);
@ -282,6 +292,12 @@ void q(fupg_txn *t, SV *sv, ...)
FUPG_TXN_COOKIE;
ST(0) = fupg_q(aTHX_ t->conn, t->stflags, SvPVutf8_nolen(sv), ax, items);
# XXX: The copy object should probably keep a ref on the transaction
void copy(fupg_txn *t, SV *sv)
CODE:
FUPG_TXN_COOKIE;
ST(0) = fupg_copy_exec(aTHX_ t->conn, SvPVutf8_nolen(sv));
MODULE = FU PACKAGE = FU::Pg::st
@ -393,6 +409,28 @@ void DESTROY(fupg_st *st)
fupg_st_destroy(aTHX_ st);
MODULE = FU PACKAGE = FU::Pg::copy
void write(fupg_copy *c, SV *sv)
CODE:
fupg_copy_write(aTHX_ c, sv);
void read(fupg_copy *c)
CODE:
ST(0) = fupg_copy_read(aTHX_ c, 0);
void is_binary(fupg_copy *c)
CODE:
ST(0) = c->bin ? &PL_sv_yes : &PL_sv_no;
void close(fupg_copy *c)
CODE:
fupg_copy_close(aTHX_ c, 0);
void DESTROY(fupg_copy *c)
CODE:
fupg_copy_destroy(aTHX_ c);
MODULE = FU PACKAGE = FU::XMLWriter

117
FU/Pg.pm
View file

@ -112,6 +112,11 @@ Inside a transaction that is in an error state. The transaction must be rolled
back in order to recover to a usable state. This happens automatically when the
transaction object goes out of scope.
=item active
Currently executing a query. This state can only be observed during a L<COPY
operation|/"COPY support">.
=item bad
Connection is dead or otherwise unusable.
@ -155,10 +160,11 @@ executing the query, but I<before> the query results have been returned.
The subroutine is (currently) only called for queries executed through C<<
$conn->exec >>, C<< $conn->q >>, C<< $conn->Q >> and their C<$txn> variants;
internal queries performed by this module (such as for transaction management,
querying type information, etc) do not trigger the callback. Statements that
result in an error being thrown during or before execution are also not
traceable this way. This behavior might change in the future.
C<< $conn->copy >> statements and internal queries performed by this module
(such as for transaction management, querying type information, etc) do not
trigger the callback. Statements that result in an error being thrown during or
before execution are also not traceable this way. This behavior might change in
the future.
=item $conn->disconnect
@ -519,6 +525,11 @@ current implementation does not track subtransactions that closely)
A subtransaction is in error state and awaiting to be rolled back.
=item active
Currently executing a query. This state can only be observed during a L<COPY
operation|/"COPY support">.
=item bad
Connection is dead or otherwise unusable.
@ -740,6 +751,71 @@ I<TODO:> Methods to convert between the various formats.
I<TODO:> Methods to query type info.
=head2 COPY support
You can use L<COPY
statements|https://www.postgresql.org/docs/current/sql-copy.html> for efficient
bulk data transfers between your application and the PostgreSQL server:
=over
=item $copy = $conn->copy($statement)
=item $copy = $txn->copy($statement)
Execute C<$statement> and return a C<FU::Pg::copy> object that lets you
transfer data to or from Postgres.
It is not possible to execute any other queries on the same connection while a
copy operation is in progress. When used on a transaction object, C<$txn> must
be kept alive long enough to finish the copy operation.
=back
A C<$copy> object supports the following methods:
=over
=item $copy->is_binary
Returns true if the transfer is performed in the binary format, false for text.
=item $copy->write($data)
Send C<$data> to the server. An error is thrown if this is not a C<COPY FROM
STDIN> operation. An error may be thrown if C<$data> is not a valid format
understood by Postgres, but such errors can also be deferred to C<close()>.
C<$data> is interpreted as a Perl Unicode string for textual transfers and as a
binary string for binary transfers.
=item $copy->read
Return the next row read from the Postgres server, or C<undef> if no more data
is coming. In the text format, a single line - including trailing newline - is
returned as a Perl Unicode string. In the binary format, a single row is
returned as a byte string. An error is thrown if this is not a C<COPY TO
STDOUT> operation.
=item $copy->close
Marks the end of the copy operation. Does not return anything but throws an
error if something went wrong.
It is possible to close a read-copy operation before all data has been
consumed, but that causes all data to still be read and discarded during
C<close()>. If you really want to interrupt a large read operation, a more
efficient approach is to call C<< $conn->close >> and discard the entire
connection.
It is not I<necessary> to call this method, simply letting the C<$copy> object
run out of scope will do the trick as well, but in that case errors are
silently discarded. An explicit C<close()> is recommended to catch errors.
=back
=head2 Errors
All methods can throw an exception on error. When possible, the error message
@ -823,32 +899,17 @@ to it after C<connect()> is always safe:
=item * Only works with blocking (synchronous) calls, not very suitable for use
in asynchronous frameworks unless you know your queries are fast and you have a
low-latency connection with the Postgres server.
low-latency connection with the Postgres server. This is unlikely to improve in
future versions, Perl's async story is somewhat awkward in general, and fully
supporting async operation might require a fundamental redesign of how this
module works.
=back
=item * LISTEN support is still missing. May be added in a future version, as
this seems doable without supporting full async.
Missing features:
=over
=item COPY support
I hope to implement this someday.
=item LISTEN support
Would be nice to have, most likely doable without going full async.
=item Asynchronous calls
Probably won't happen. Perl's async story is slightly awkward in general, and
fully supporting async operation might require a fundamental redesign of how
this module works. It certainly won't I<simplify> the implementation.
=item Pipelining
I have some ideas for an API, but doubt I'll ever implement it. Suffers from
the same awkwardness and complexity as asynchronous calls.
=item * Pipelining support is also missing. I have some ideas for an API, but
doubt I'll ever implement it. Suffers from the same awkwardness and complexity
as asynchronous calls.
=back

View file

@ -36,6 +36,7 @@ typedef enum { PQTRANS_IDLE, PQTRANS_ACTIVE, PQTRANS_INTRANS, PQTRANS_INERROR, P
#define PG_DIAG_SOURCE_FUNCTION 'R'
#define PG_FUNCS \
X(PQbinaryTuples, int, const PGresult *) \
X(PQclear, void, PGresult *) \
X(PQclosePrepared, PGresult *, PGconn *, const char *) \
X(PQcmdTuples, char *, PGresult *) \
@ -51,9 +52,10 @@ typedef enum { PQTRANS_IDLE, PQTRANS_ACTIVE, PQTRANS_INTRANS, PQTRANS_INERROR, P
X(PQfname, char *, const PGresult *, int) \
X(PQfreemem, void, void *) \
X(PQftype, Oid, const PGresult *, int) \
X(PQgetCopyData, int, PGconn *, char **, int) \
X(PQgetResult, PGresult *, PGconn *) \
X(PQgetisnull, int, const PGresult *, int, int) \
X(PQgetlength, int, const PGresult *, int, int) \
X(PQgetResult, PGresult *, PGconn *) \
X(PQgetvalue, char *, const PGresult *, int, int) \
X(PQlibVersion, int, void) \
X(PQnfields, int, const PGresult *) \
@ -61,6 +63,8 @@ typedef enum { PQTRANS_IDLE, PQTRANS_ACTIVE, PQTRANS_INTRANS, PQTRANS_INERROR, P
X(PQntuples, int, const PGresult *) \
X(PQparamtype, Oid, const PGresult *, int) \
X(PQpipelineSync, int, PGconn *) \
X(PQputCopyData, int, PGconn *, const char *, int) \
X(PQputCopyEnd, int, PGconn *, const char *) \
X(PQresStatus, char *, ExecStatusType) \
X(PQresultErrorField, char *, const PGresult *, int) \
X(PQresultErrorMessage, char *, const PGresult *) \

View file

@ -503,3 +503,82 @@ static SV *fupg_st_kvh(pTHX_ fupg_st *st) {
}
return sv;
}
/* COPY support */
typedef struct {
SV *self;
fupg_conn *conn;
char in;
char bin;
char rddone;
char closed;
} fupg_copy;
static SV *fupg_copy_exec(pTHX_ fupg_conn *c, const char *sql) {
PGresult *r = PQexec(c->conn, sql);
if (!r) fupg_conn_croak(c, "exec");
int s = PQresultStatus(r);
switch (s) {
case PGRES_COPY_OUT:
case PGRES_COPY_IN:
break;
default: fupg_result_croak(r, "exec", sql);
}
fupg_copy *copy = safecalloc(1, sizeof(fupg_copy));
copy->conn = c;
SvREFCNT_inc(c->self);
copy->bin = !!PQbinaryTuples(r);
copy->in = s == PGRES_COPY_IN;
PQclear(r);
return fu_selfobj(copy, "FU::Pg::copy");
}
static void fupg_copy_write(pTHX_ fupg_copy *c, SV *data) {
STRLEN len;
const char *buf = c->bin ? SvPVbyte(data, len) : SvPVutf8(data, len);
if (PQputCopyData(c->conn->conn, buf, len) < 0) fupg_conn_croak(c->conn, "copy");
}
static SV *fupg_copy_read(pTHX_ fupg_copy *c, int discard) {
char *buf = NULL;
int len = PQgetCopyData(c->conn->conn, &buf, 0);
if (len == -1) {
c->rddone = 1;
return &PL_sv_undef;
} else if (len < 0) {
if (discard) c->rddone = 1;
else fupg_conn_croak(c->conn, "copy");
}
SV *r = discard ? &PL_sv_undef : newSVpvn_flags(buf, len, SVs_TEMP | (c->bin ? 0 : SVf_UTF8));
PQfreemem(buf);
return r;
}
static void fupg_copy_close(pTHX_ fupg_copy *c, int ignerror) {
if (c->closed) return;
c->closed = 1; /* Mark as closed even on error, a second attempt won't help anyway */
if (c->in && PQputCopyEnd(c->conn->conn, NULL) < 0 && !ignerror)
fupg_conn_croak(c->conn, "copyEnd");
while (!c->in && !c->rddone) fupg_copy_read(aTHX_ c, 1);
PGresult *r = PQgetResult(c->conn->conn);
if (!ignerror && !r) fupg_conn_croak(c->conn, "copyEnd");
if (!ignerror && PQresultStatus(r) != PGRES_COMMAND_OK) fupg_result_croak(r, "copy", "");
PQclear(r);
while ((r = PQgetResult(c->conn->conn))) PQclear(r);
}
static void fupg_copy_destroy(pTHX_ fupg_copy *c) {
fupg_copy_close(aTHX_ c, 1);
SvREFCNT_dec(c->conn->self);
safefree(c);
}

90
t/pgcopy.t Normal file
View file

@ -0,0 +1,90 @@
use v5.36;
use Test::More;
plan skip_all => $@ if !eval { require FU::Pg; } && $@ =~ /Unable to load libpq/;
die $@ if $@;
plan skip_all => 'Please set FU_TEST_DB to a PostgreSQL connection string to run these tests' if !$ENV{FU_TEST_DB};
my $conn = FU::Pg->connect($ENV{FU_TEST_DB});
$conn->_debug_trace(0);
ok !eval { $conn->copy('SELECT 1') };
like $@, qr/unexpected status code/;
ok !eval { $conn->copy('COPX') };
like $@, qr/syntax error/;
$conn->exec('CREATE TEMPORARY TABLE fupg_copy_test (v int)');
is $conn->status, 'idle';
{
my $c = $conn->copy('COPY (SELECT 1) TO STDOUT');
is $conn->status, 'active';
$c->close;
}
is $conn->status, 'idle';
$conn->copy('COPY (SELECT 1) TO STDOUT');
is $conn->status, 'idle';
{
my $c = $conn->copy('COPY fupg_copy_test FROM STDIN');
is $conn->status, 'active';
$c->close;
}
is $conn->status, 'idle';
$conn->copy('COPY fupg_copy_test FROM STDIN');
is $conn->status, 'idle';
{
my $c = $conn->copy('COPY fupg_copy_test FROM STDIN');
ok !$c->is_binary;
ok !eval { $c->{read} };
$c->write("1");
$c->write("\n2\n3\n");
$c->close;
ok !eval { $c->read };
ok !eval { $c->write('') };
$c->close;
}
is $conn->status, 'idle';
{
my $c = $conn->copy('COPY (SELECT * FROM fupg_copy_test ORDER BY v) TO STDOUT');
ok !$c->is_binary;
ok !eval { $c->write('') };
is $c->read, "1\n";
is $c->read, "2\n";
is $c->read, "3\n";
is $c->read, undef;
$c->close;
ok !eval { $c->read };
ok !eval { $c->write('') };
$c->close;
}
is $conn->status, 'idle';
my $bin = '';
{
my $c = $conn->copy('COPY fupg_copy_test TO STDOUT (FORMAT binary)');
ok $c->is_binary;
while (my $d = $c->read) {
$bin .= $d;
}
$c->close;
}
is $conn->status, 'idle';
{
my $txn = $conn->txn;
my $c = $txn->copy('COPY fupg_copy_test FROM STDIN (FORMAT binary)');
is $txn->status, 'active';
ok $c->is_binary;
$c->write($bin);
$c->close;
is $txn->q('SELECT sum(v) FROM fupg_copy_test')->val, 1+1+2+2+3+3;
$txn->rollback;
}
is $conn->q('SELECT sum(v) FROM fupg_copy_test')->val, 1+2+3;
done_testing;