fu/FU.xs
Yorhel 48fe393d5f FastCGI: Improve handling of EPIPE while writing response
That would previously result in the worker getting killed with SIGPIPE.
Which works, but we can also recover from that error without restarting
the process.
2026-01-05 08:57:50 +01:00

537 lines
12 KiB
Text

#include <stdio.h>
#include <errno.h>
#include <time.h> /* struct timespec & clock_gettime() */
#include <string.h> /* strerror() */
#include <arpa/inet.h> /* inet_ntop(), inet_ntoa() */
#include <sys/socket.h> /* send(), fd passing */
#include <sys/un.h> /* fd passing */
#include <dlfcn.h> /* dlopen() etc */
#undef PERL_IMPLICIT_SYS
#define PERL_NO_GET_CONTEXT
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#ifndef av_push_simple
#define av_push_simple av_push
#endif
#ifndef BOOL_INTERNALS_sv_isbool_true
#define BOOL_INTERNALS_sv_isbool_true(x) SvTRUEx(x)
#endif
#ifndef newSV_true
#define newSV_true() newSVsv(&PL_sv_yes)
#endif
#ifndef newSV_false
#define newSV_false() newSVsv(&PL_sv_no)
#endif
/* Disable key/value struct packing in khashl, so we can safely take a pointer
* to values inside the hash table. */
#define kh_packed
#include "c/khashl.h"
#include "c/common.c"
#include "c/compress.c"
#include "c/fcgi.c"
#include "c/fdpass.c"
#include "c/jsonfmt.c"
#include "c/jsonparse.c"
#include "c/xmlwr.c"
#include "c/libpq.h"
#include "c/pgtypes.c"
#include "c/pgconn.c"
#include "c/pgst.c"
#define FUPG_CONN_COOKIE \
if (c->cookie) fu_confess("Invalid operation on the top-level connection while a transaction object exists")
#define FUPG_TXN_COOKIE \
if (!t->cookie) fu_confess("Invalid operation on a transaction that has already been marked as done"); \
if (t->cookie != t->conn->cookie) fu_confess("Invalid operation on transaction while a subtransaction object exists")
#define FUPG_ST_COOKIE \
if (st->cookie != st->conn->cookie) fu_confess("Invalid cross-transaction operation on statement object")
#define FUPG_STFLAGS do {\
if (!ix) ix = FUPG_CACHE;\
if (items == 1 || SvTRUE(ST(1))) x->stflags |= ix; \
else x->stflags &= ~ix; \
} while(0)
MODULE = FU
PROTOTYPES: DISABLE
TYPEMAP: <<EOT
TYPEMAP
fufcgi * FUFCGI
fuxmlwr * FUXMLWR
fupg_conn * FUPG_CONN
fupg_txn * FUPG_TXN
fupg_st * FUPG_ST
fupg_copy * FUPG_COPY
INPUT
FUFCGI
if (sv_derived_from($arg, \"FU::fcgi\")) $var = (fufcgi *)SvIVX(SvRV($arg));
else fu_confess(\"invalid FastCGI object\");
FUXMLWR
if (sv_derived_from($arg, \"FU::XMLWriter\")) $var = (fuxmlwr *)SvIVX(SvRV($arg));
else fu_confess(\"invalid FU::XMLWriter object\");
FUPG_CONN
if (sv_derived_from($arg, \"FU::Pg::conn\")) $var = (fupg_conn *)SvIVX(SvRV($arg));
else fu_confess(\"invalid connection object\");
FUPG_TXN
if (sv_derived_from($arg, \"FU::Pg::txn\")) $var = (fupg_txn *)SvIVX(SvRV($arg));
else fu_confess(\"invalid transaction object\");
FUPG_ST
if (sv_derived_from($arg, \"FU::Pg::st\")) $var = (fupg_st *)SvIVX(SvRV($arg));
else fu_confess(\"invalid statement object\");
FUPG_COPY
if (sv_derived_from($arg, \"FU::Pg::copy\")) $var = (fupg_copy *)SvIVX(SvRV($arg));
else fu_confess(\"invalid COPY object\");
#"
EOT
MODULE = FU PACKAGE = FU::Util
void to_bool(SV *val)
PROTOTYPE: $
CODE:
SvGETMAGIC(val);
int r = fu_2bool(aTHX_ val);
ST(0) = r < 0 ? &PL_sv_undef : r ? &PL_sv_yes : &PL_sv_no;
void json_format(SV *val, ...)
CODE:
ST(0) = fujson_fmt_xs(aTHX_ ax, items, val);
void json_parse(SV *val, ...)
CODE:
ST(0) = fujson_parse_xs(aTHX_ ax, items, val);
void gzip_lib()
PROTOTYPE:
CODE:
ST(0) = sv_2mortal(newSVpv(fugz_lib(), 0));
void gzip_compress(IV level, SV *in)
CODE:
ST(0) = fugz_compress(aTHX_ level, in);
void brotli_compress(IV level, SV *in)
CODE:
ST(0) = fubr_compress(aTHX_ level, in);
void fdpass_send(int socket, int fd, SV *data)
CODE:
STRLEN buflen;
const char *buf = SvPVbyte(data, buflen);
ST(0) = sv_2mortal(newSViv(fufdpass_send(socket, fd, buf, buflen)));
void fdpass_recv(int socket, UV len)
CODE:
XSRETURN(fufdpass_recv(aTHX_ ax, socket, len));
MODULE = FU PACKAGE = FU::fcgi
void new(int fd, int maxproc)
CODE:
fufcgi *ctx = safemalloc(sizeof(*ctx));
ctx->fd = fd;
ctx->maxproc = maxproc;
ctx->reqid = ctx->keepconn = ctx->len = ctx->off = 0;
ST(0) = fu_selfobj(ctx, "FU::fcgi");
void read_req(fufcgi *ctx, SV *headers, SV *params)
CODE:
ST(0) = sv_2mortal(newSViv(fufcgi_read_req(aTHX_ ctx, headers, params)));
ctx->off = ctx->len = 0;
void keepalive(fufcgi *ctx)
CODE:
ST(0) = ctx->keepconn ? &PL_sv_yes : &PL_sv_no;
void print(fufcgi *ctx, SV *sv)
CODE:
STRLEN len;
const char *buf = SvPVbyte(sv, len);
fufcgi_print(aTHX_ ctx, buf, len);
void flush(fufcgi *ctx)
CODE:
fufcgi_done(aTHX_ ctx);
void DESTROY(fufcgi *ctx)
CODE:
safefree(ctx);
MODULE = FU PACKAGE = FU::Pg
void _load_libpq()
CODE:
if (!PQconnectdb) fupg_load();
void lib_version()
CODE:
XSRETURN_IV(PQlibVersion());
void connect(const char *pkg, const char *conninfo)
CODE:
(void)pkg;
ST(0) = fupg_connect(aTHX_ conninfo);
MODULE = FU PACKAGE = FU::Pg::conn
void server_version(fupg_conn *c)
CODE:
XSRETURN_IV(PQserverVersion(c->conn));
void _debug_trace(fupg_conn *c, bool on)
CODE:
if (on) PQtrace(c->conn, stderr);
else PQuntrace(c->conn);
ST(0) = c->self;
void query_trace(fupg_conn *c, SV *cb)
CODE:
if (c->trace) SvREFCNT_dec(c->trace);
SvGETMAGIC(cb);
c->trace = SvOK(cb) ? SvREFCNT_inc(cb) : NULL;
void conn(fupg_conn *c)
CODE:
ST(0) = sv_newmortal();
sv_setrv_inc(ST(0), c->self);
sv_bless(ST(0), gv_stashpv("FU::Pg::conn", 0));
void status(fupg_conn *c)
CODE:
ST(0) = sv_2mortal(newSVpv(fupg_conn_status(c), 0));
void escape_literal(fupg_conn *c, SV *v)
CODE:
STRLEN len;
const char *str = SvPVutf8(v, len);
char *r = PQescapeLiteral(c->conn, str, len);
if (!r) fupg_conn_croak(c, "escapeLiteral");
ST(0) = newSVpvn_flags(r, strlen(r), SVf_UTF8|SVs_TEMP);
PQfreemem(r);
void escape_identifier(fupg_conn *c, SV *v)
CODE:
STRLEN len;
const char *str = SvPVutf8(v, len);
char *r = PQescapeIdentifier(c->conn, str, len);
if (!r) fupg_conn_croak(c, "escapeIdentifier");
ST(0) = newSVpvn_flags(r, strlen(r), SVf_UTF8|SVs_TEMP);
PQfreemem(r);
void cache(fupg_conn *x, ...)
ALIAS:
FU::Pg::conn::text_params = FUPG_TEXT_PARAMS
FU::Pg::conn::text_results = FUPG_TEXT_RESULTS
FU::Pg::conn::text = FUPG_TEXT
CODE:
FUPG_STFLAGS;
void cache_size(fupg_conn *c, unsigned int n)
CODE:
c->prep_max = n;
fupg_prepared_prune(c);
XSRETURN(1);
void disconnect(fupg_conn *c)
CODE:
fupg_conn_disconnect(c);
void DESTROY(fupg_conn *c)
CODE:
fupg_conn_destroy(aTHX_ c);
void txn(fupg_conn *c)
CODE:
FUPG_CONN_COOKIE;
ST(0) = fupg_conn_txn(aTHX_ c);
void exec(fupg_conn *c, SV *sv)
CODE:
FUPG_CONN_COOKIE;
ST(0) = fupg_exec(aTHX_ c, SvPVutf8_nolen(sv));
void sql(fupg_conn *c, SV *sv, ...)
CODE:
FUPG_CONN_COOKIE;
ST(0) = fupg_sql(aTHX_ c, c->stflags, SvPVutf8_nolen(sv), ax, items);
void copy(fupg_conn *c, SV *sv)
CODE:
FUPG_CONN_COOKIE;
ST(0) = fupg_copy_exec(aTHX_ c, SvPVutf8_nolen(sv));
void _set_type(fupg_conn *c, SV *name, SV *sendsv, SV *recvsv)
CODE:
fupg_set_type(aTHX_ c, name, sendsv, recvsv);
XSRETURN(1);
void perl2bin(fupg_conn *c, int oid, SV *sv)
CODE:
ST(0) = fupg_perl2bin(aTHX_ c, oid, sv);
void bin2perl(fupg_conn *c, int oid, SV *sv)
CODE:
ST(0) = fupg_bin2perl(aTHX_ c, oid, sv);
void bin2text(fupg_conn *c, ...)
CODE:
XSRETURN(fupg_bintext(aTHX_ c, 0, ax, items));
void text2bin(fupg_conn *c, ...)
CODE:
XSRETURN(fupg_bintext(aTHX_ c, 1, ax, items));
MODULE = FU PACKAGE = FU::Pg::txn
void DESTROY(fupg_txn *t)
CODE:
fupg_txn_destroy(aTHX_ t);
void cache(fupg_txn *x, ...)
ALIAS:
FU::Pg::txn::text_params = FUPG_TEXT_PARAMS
FU::Pg::txn::text_results = FUPG_TEXT_RESULTS
FU::Pg::txn::text = FUPG_TEXT
CODE:
FUPG_STFLAGS;
void conn(fupg_txn *t)
CODE:
ST(0) = sv_newmortal();
sv_setrv_inc(ST(0), t->conn->self);
sv_bless(ST(0), gv_stashpv("FU::Pg::conn", 0));
void status(fupg_txn *t)
CODE:
ST(0) = sv_2mortal(newSVpv(fupg_txn_status(t), 0));
void txn(fupg_txn *t)
CODE:
FUPG_TXN_COOKIE;
ST(0) = fupg_txn_txn(aTHX_ t);
void commit(fupg_txn *t)
CODE:
FUPG_TXN_COOKIE;
fupg_txn_commit(t);
void rollback(fupg_txn *t)
CODE:
FUPG_TXN_COOKIE;
fupg_txn_rollback(t);
void exec(fupg_txn *t, SV *sv)
CODE:
FUPG_TXN_COOKIE;
ST(0) = fupg_exec(aTHX_ t->conn, SvPVutf8_nolen(sv));
void sql(fupg_txn *t, SV *sv, ...)
CODE:
FUPG_TXN_COOKIE;
ST(0) = fupg_sql(aTHX_ t->conn, t->stflags, SvPVutf8_nolen(sv), ax, items);
# XXX: The copy object should probably keep a ref on the transaction
void copy(fupg_txn *t, SV *sv)
CODE:
FUPG_TXN_COOKIE;
ST(0) = fupg_copy_exec(aTHX_ t->conn, SvPVutf8_nolen(sv));
MODULE = FU PACKAGE = FU::Pg::st
void cache(fupg_st *x, ...)
ALIAS:
FU::Pg::st::text_params = FUPG_TEXT_PARAMS
FU::Pg::st::text_results = FUPG_TEXT_RESULTS
FU::Pg::st::text = FUPG_TEXT
CODE:
if (ix == 0 && x->prepared) fu_confess("Invalid attempt to change statement configuration after it has already been prepared or executed");
FUPG_STFLAGS;
XSRETURN(1);
void exec(fupg_st *st)
CODE:
FUPG_ST_COOKIE;
ST(0) = fupg_st_exec(aTHX_ st);
void val(fupg_st *st)
CODE:
FUPG_ST_COOKIE;
ST(0) = fupg_st_val(aTHX_ st);
void rowl(fupg_st *st)
CODE:
FUPG_ST_COOKIE;
XSRETURN(fupg_st_rowl(aTHX_ st, ax));
void rowa(fupg_st *st)
CODE:
FUPG_ST_COOKIE;
ST(0) = fupg_st_rowa(aTHX_ st);
void rowh(fupg_st *st)
CODE:
FUPG_ST_COOKIE;
ST(0) = fupg_st_rowh(aTHX_ st);
void alla(fupg_st *st)
CODE:
FUPG_ST_COOKIE;
ST(0) = fupg_st_alla(aTHX_ st);
void allh(fupg_st *st)
CODE:
FUPG_ST_COOKIE;
ST(0) = fupg_st_allh(aTHX_ st);
void flat(fupg_st *st)
CODE:
FUPG_ST_COOKIE;
ST(0) = fupg_st_flat(aTHX_ st);
void kvv(fupg_st *st)
CODE:
FUPG_ST_COOKIE;
ST(0) = fupg_st_kvv(aTHX_ st);
void kva(fupg_st *st)
CODE:
FUPG_ST_COOKIE;
ST(0) = fupg_st_kva(aTHX_ st);
void kvh(fupg_st *st)
CODE:
FUPG_ST_COOKIE;
ST(0) = fupg_st_kvh(aTHX_ st);
void param_types(fupg_st *st)
CODE:
FUPG_ST_COOKIE;
ST(0) = fupg_st_param_types(aTHX_ st);
void param_values(fupg_st *st);
CODE:
ST(0) = fupg_st_param_values(aTHX_ st);
void columns(fupg_st *st)
CODE:
FUPG_ST_COOKIE;
ST(0) = fupg_st_columns(aTHX_ st);
void nrows(fupg_st *st)
CODE:
ST(0) = st->result ? sv_2mortal(newSViv(PQntuples(st->result))) : &PL_sv_undef;
void query(fupg_st *st)
CODE:
ST(0) = newSVpvn_flags(st->query, strlen(st->query), SVs_TEMP|SVf_UTF8);
void exec_time(fupg_st *st)
CODE:
ST(0) = st->exectime <= 0 ? &PL_sv_undef : sv_2mortal(newSVnv(st->exectime));
void prepare_time(fupg_st *st)
CODE:
ST(0) = !st->prepared ? &PL_sv_undef : sv_2mortal(newSVnv(st->preptime));
void get_cache(fupg_st *st)
ALIAS:
FU::Pg::st::get_text_params = FUPG_TEXT_PARAMS
FU::Pg::st::get_text_results = FUPG_TEXT_RESULTS
CODE:
if (!ix) ix = FUPG_CACHE;
ST(0) = st->stflags & ix ? &PL_sv_yes : &PL_sv_no;
void DESTROY(fupg_st *st)
CODE:
fupg_st_destroy(aTHX_ st);
MODULE = FU PACKAGE = FU::Pg::copy
void write(fupg_copy *c, SV *sv)
CODE:
fupg_copy_write(aTHX_ c, sv);
void read(fupg_copy *c)
CODE:
ST(0) = fupg_copy_read(aTHX_ c, 0);
void is_binary(fupg_copy *c)
CODE:
ST(0) = c->bin ? &PL_sv_yes : &PL_sv_no;
void close(fupg_copy *c)
CODE:
fupg_copy_close(aTHX_ c, 0);
void DESTROY(fupg_copy *c)
CODE:
fupg_copy_destroy(aTHX_ c);
MODULE = FU PACKAGE = FU::XMLWriter
void _new()
CODE:
ST(0) = fuxmlwr_new(aTHX);
void _done(fuxmlwr *wr)
CODE:
ST(0) = sv_2mortal(fustr_done(&wr->out));
fustr_init(&wr->out, NULL, SIZE_MAX);
void lit_(SV *sv)
CODE:
if (!fuxmlwr_tail) fu_confess("No active FU::XMLWriter instance");
STRLEN len;
const char *buf = SvPVutf8(sv, len);
fustr_write(&fuxmlwr_tail->out, buf, len);
void txt_(SV *sv)
CODE:
if (!fuxmlwr_tail) fu_confess("No active FU::XMLWriter instance");
fuxmlwr_escape(aTHX_ fuxmlwr_tail, sv);
void tag_(SV *sv, ...)
CODE:
if (!fuxmlwr_tail) fu_confess("No active FU::XMLWriter instance");
STRLEN len;
const char *tagname = SvPV(sv, len);
fuxmlwr_isname(tagname);
fuxmlwr_tag(aTHX_ fuxmlwr_tail, ax, 1, items, 0, tagname, len);
INCLUDE_COMMAND: $^X -e '$FU::XMLWriter::XSPRINT=1; require "./FU/XMLWriter.pm"'
void DESTROY(fuxmlwr *wr)
CODE:
fuxmlwr_destroy(aTHX_ wr);