FU: Add supervisor daemon thingy

A little tricky to get right, but it works pretty well.

TODO: Do something with --monitor.

I initially wanted to avoid the exec() and just let Perl continue
running the rest of the script after fork(), but that runs into the
problem that perl really doesn't like it when you fork() in BEGIN.
This commit is contained in:
Yorhel 2025-02-14 11:30:17 +01:00
parent 08410e56f5
commit 09fe50d2a2

171
FU.pm
View file

@ -1,6 +1,8 @@
package FU 0.1;
use v5.36;
use Carp 'confess';
use IO::Socket;
use POSIX;
sub import($pkg, @opt) {
@ -200,8 +202,6 @@ sub _http_read_request($sock, $req) {
my $run_config = undef;
sub _is_done($e) { ref $@ eq 'FU::err' && $@->[0] == 200 }
sub _log_err($e) {
@ -210,16 +210,16 @@ sub _log_err($e) {
warn $e =~ /\n$/ ? $e : "$e\n";
}
sub _do_req($sock) {
# TODO: check for changes if $run_config->{monitor}
sub _do_req($c) {
# TODO: check for changes if $c->{monitor}
local $REQ = {};
local $fu = bless {}, 'FU::obj';
$REQ->{ip} = $sock isa 'IO::Socket::INET' ? $sock->peerhost : '127.0.0.1';
$REQ->{ip} = $c->{client_sock} isa 'IO::Socket::INET' ? $c->{client_sock}->peerhost : '127.0.0.1';
fu->reset;
my $ok = eval {
_http_read_request($sock, $REQ);
_http_read_request($c->{client_sock}, $REQ);
for my $h (@before_request) { $h->() }
@ -261,24 +261,100 @@ sub _do_req($sock) {
} || _err_500();
}
fu->_flush($sock);
$sock->close;
fu->_flush($c->{client_sock});
$c->{client_sock}->close;
exit if $run_config->{max_reqs} && --$run_config->{max_reqs};
exit if $c->{max_reqs} && !--$c->{max_reqs};
}
sub _supervisor($c) {
my ($rsock, $wsock) = IO::Socket->socketpair(IO::Socket::AF_UNIX(), IO::Socket::SOCK_STREAM(), IO::Socket::PF_UNSPEC());
my %childs; # pid => 1: spawned, 2: signalled ready
$SIG{CHLD} = sub { $wsock->syswrite('c0000',1) };
$SIG{HUP} = $SIG{TERM} = $SIG{INT} = sub($sig,@) {
kill 'TERM', keys %childs;
return if $sig eq 'HUP';
$SIG{$sig} = undef;
kill $sig, $$;
exit 1;
};
require Fcntl;
fcntl $c->{listen_sock}, Fcntl::F_SETFD(), 0;
fcntl $wsock, Fcntl::F_SETFD(), 0;
my @child_cmd = (
$^X, (map "-I$_", @INC), $0,
$c->{monitor} ? '--monitor' : '--no-monitor',
$c->{max_reqs} ? "--max-reqs=$c->{max_reqs}" : (),
debug ? '--debug' : '--no-debug',
'--supervisor-fd='.fileno($wsock),
'--listen-fd='.fileno($c->{listen_sock}),
);
my $err = 0;
while (1) {
while ((my $pid = waitpid(-1, POSIX::WNOHANG())) > 0) {
$err = 1 if POSIX::WIFEXITED($?) && POSIX::WEXITSTATUS($?) != 0;
if (!$err && (!$childs{$pid} || $childs{$pid} != 2)) {
$err = 1;
warn "Script exited before calling FU::run()\n";
}
delete $childs{$pid};
}
# Don't bother spawning more than 1 at a time while in error state
my $spawn = !$err ? $c->{proc} - keys %childs : (grep $_ == 1, values %childs) ? 0 : 1;
for (1..$spawn) {
my $pid = fork;
die $! if !defined $pid;
if (!$pid) { # child
$SIG{CHLD} = $SIG{HUP} = $SIG{INT} = $SIG{TERM} = undef;
# In error state, wait with loading the script until we've received a request.
# Otherwise we'll end up in an infinite spawning loop if the script doesn't start properly.
my $sock;
if ($err) {
$sock = $c->{listen_sock}->accept() or die $!;
fcntl $sock, Fcntl::F_SETFD, 0 if $sock;
}
exec @child_cmd, $sock ? '--client-fd='.fileno($sock) : ();
exit 1;
}
$childs{$pid} = 1;
}
next if ($rsock->sysread(my $cmd, 5)//0) != 5;
next if $cmd eq 'c0000'; # child died
if ($cmd =~ /^r/) { # child ready
my $pid = unpack 'V', substr $cmd, 1;
$childs{$pid} = 2 if $childs{$pid};
$err = 0;
}
# TODO: Socket passing thing for autoreloading childs
}
}
sub _spawn {
return if $run_config && !@_; # already checked if we need to spawn
state %c = (
listen_sock => undef,
client_sock => undef,
supervisor_sock => undef,
init => 0,
);
return if $c{init} && !@_; # already checked if we need to spawn
$run_config = $_[0] || do {
my %c = (
http => $ENV{FU_HTTP} // '127.0.0.1:3000',
fcgi => $ENV{FU_FCGI},
proc => $ENV{FU_PROC} // 1,
monitor => $ENV{FU_MONITOR},
max_reqs => $ENV{FU_MAX_REQS},
);
%c = (%c, @_, init => 1) if @_ && defined $_[0];
if (!$c{init}++) {
$c{http} = $ENV{FU_HTTP} // '127.0.0.1:3000';
$c{fcgi} = $ENV{FU_FCGI};
$c{proc} = $ENV{FU_PROC} // 1;
$c{monitor} = $ENV{FU_MONITOR};
$c{max_reqs} = $ENV{FU_MAX_REQS};
debug = 1 if $ENV{FU_DEBUG};
for (@ARGV) {
$c{http} = $1 if /^--http=(.+)$/;
$c{fcgi} = $1 if /^--fcgi=(.+)$/;
@ -288,36 +364,47 @@ sub _spawn {
$c{max_reqs} = $1 if /^--max-reqs=([0-9]+)$/;
debug = 1 if /^--debug$/;
debug = 0 if /^--no-debug$/;
$c{listen_sock} = IO::Socket->new_from_fd($1, 'r') if /^--listen-fd=([0-9]+)$/;
$c{client_sock} = IO::Handle->new_from_fd($1, 'r+') if /^--client-fd=([0-9]+)$/;
$c{supervisor_sock} = IO::Handle->new_from_fd($1, 'w') if /^--supervisor-fd=([0-9]+)$/;
}
\%c;
};
# Single process, no need for a supervisor
my $need_supervisor = $run_config->{proc} > 1 || $run_config->{monitor} || $run_config->{max_reqs};
my $need_supervisor = !$c{supervisor_sock} && !$c{client_sock}
&& ($c{proc} > 1 || $c{monitor} || $c{max_reqs});
return if !@_ && !$need_supervisor;
require IO::Socket;
my $addr = $run_config->{fcgi} || $run_config->{http};
my $listen = IO::Socket->new(
Listen => 5,
Type => IO::Socket::SOCK_STREAM(),
$addr =~ m{^(unix:|/)(.+)$} ? do {
my $path = ($1 eq '/' ? '/' : '').$2;
unlink $path if -S $path;
+(Domain => IO::Socket::AF_UNIX(), Local => $path)
} : (
Domain => IO::Socket::AF_INET(),
ReuseAddr => 1,
Proto => 'tcp',
LocalAddr => $addr,
)
) or die "Unable to create listen socket: $!\n";
if (!$c{listen_sock}) {
my $addr = $c{fcgi} || $c{http};
$c{listen_sock} = IO::Socket->new(
Listen => 5,
Type => IO::Socket::SOCK_STREAM(),
$addr =~ m{^(unix:|/)(.+)$} ? do {
my $path = ($1 eq '/' ? '/' : '').$2;
unlink $path if -S $path;
+(Domain => IO::Socket::AF_UNIX(), Local => $path)
} : (
Domain => IO::Socket::AF_INET(),
ReuseAddr => 1,
Proto => 'tcp',
LocalAddr => $addr,
)
) or die "Unable to create listen socket: $!\n";
print "Listening on $addr\n" if debug;
}
# TODO: Spawn supervisor
print "Listening on $addr\n" if debug;
while (my $sock = $listen->accept) {
_do_req $sock;
if ($need_supervisor) {
_supervisor \%c;
} else {
$c{supervisor_sock}->syswrite('r'.pack 'V', $$) if $c{supervisor_sock};
my $stop = 0;
local $SIG{HUP} = 'IGNORE';
local $SIG{TERM} = $SIG{INT} = sub { $stop = 1 };
_do_req \%c if $c{client_sock};
while (!$stop) {
_do_req \%c if ($c{client_sock} = $c{listen_sock}->accept);
}
}
}
@ -449,7 +536,7 @@ sub _flush($, $sock) {
}
$sock->print("\r\n");
$sock->print($r->{resbody}) if fu->method ne 'HEAD' && $r->{status} != 204;
$sock->print($r->{resbody}) if (fu->method//'') ne 'HEAD' && $r->{status} != 204;
$sock->flush;
}