diff --git a/FU.pm b/FU.pm index 41e886b..30ae4b4 100644 --- a/FU.pm +++ b/FU.pm @@ -1,6 +1,8 @@ package FU 0.1; use v5.36; use Carp 'confess'; +use IO::Socket; +use POSIX; sub import($pkg, @opt) { @@ -200,8 +202,6 @@ sub _http_read_request($sock, $req) { -my $run_config = undef; - sub _is_done($e) { ref $@ eq 'FU::err' && $@->[0] == 200 } sub _log_err($e) { @@ -210,16 +210,16 @@ sub _log_err($e) { warn $e =~ /\n$/ ? $e : "$e\n"; } -sub _do_req($sock) { - # TODO: check for changes if $run_config->{monitor} +sub _do_req($c) { + # TODO: check for changes if $c->{monitor} local $REQ = {}; local $fu = bless {}, 'FU::obj'; - $REQ->{ip} = $sock isa 'IO::Socket::INET' ? $sock->peerhost : '127.0.0.1'; + $REQ->{ip} = $c->{client_sock} isa 'IO::Socket::INET' ? $c->{client_sock}->peerhost : '127.0.0.1'; fu->reset; my $ok = eval { - _http_read_request($sock, $REQ); + _http_read_request($c->{client_sock}, $REQ); for my $h (@before_request) { $h->() } @@ -261,24 +261,100 @@ sub _do_req($sock) { } || _err_500(); } - fu->_flush($sock); - $sock->close; + fu->_flush($c->{client_sock}); + $c->{client_sock}->close; - exit if $run_config->{max_reqs} && --$run_config->{max_reqs}; + exit if $c->{max_reqs} && !--$c->{max_reqs}; +} + +sub _supervisor($c) { + my ($rsock, $wsock) = IO::Socket->socketpair(IO::Socket::AF_UNIX(), IO::Socket::SOCK_STREAM(), IO::Socket::PF_UNSPEC()); + + my %childs; # pid => 1: spawned, 2: signalled ready + $SIG{CHLD} = sub { $wsock->syswrite('c0000',1) }; + $SIG{HUP} = $SIG{TERM} = $SIG{INT} = sub($sig,@) { + kill 'TERM', keys %childs; + return if $sig eq 'HUP'; + $SIG{$sig} = undef; + kill $sig, $$; + exit 1; + }; + + require Fcntl; + fcntl $c->{listen_sock}, Fcntl::F_SETFD(), 0; + fcntl $wsock, Fcntl::F_SETFD(), 0; + + my @child_cmd = ( + $^X, (map "-I$_", @INC), $0, + $c->{monitor} ? '--monitor' : '--no-monitor', + $c->{max_reqs} ? "--max-reqs=$c->{max_reqs}" : (), + debug ? '--debug' : '--no-debug', + '--supervisor-fd='.fileno($wsock), + '--listen-fd='.fileno($c->{listen_sock}), + ); + + my $err = 0; + while (1) { + while ((my $pid = waitpid(-1, POSIX::WNOHANG())) > 0) { + $err = 1 if POSIX::WIFEXITED($?) && POSIX::WEXITSTATUS($?) != 0; + if (!$err && (!$childs{$pid} || $childs{$pid} != 2)) { + $err = 1; + warn "Script exited before calling FU::run()\n"; + } + delete $childs{$pid}; + } + + # Don't bother spawning more than 1 at a time while in error state + my $spawn = !$err ? $c->{proc} - keys %childs : (grep $_ == 1, values %childs) ? 0 : 1; + for (1..$spawn) { + my $pid = fork; + die $! if !defined $pid; + if (!$pid) { # child + $SIG{CHLD} = $SIG{HUP} = $SIG{INT} = $SIG{TERM} = undef; + # In error state, wait with loading the script until we've received a request. + # Otherwise we'll end up in an infinite spawning loop if the script doesn't start properly. + my $sock; + if ($err) { + $sock = $c->{listen_sock}->accept() or die $!; + fcntl $sock, Fcntl::F_SETFD, 0 if $sock; + } + exec @child_cmd, $sock ? '--client-fd='.fileno($sock) : (); + exit 1; + } + $childs{$pid} = 1; + } + + next if ($rsock->sysread(my $cmd, 5)//0) != 5; + next if $cmd eq 'c0000'; # child died + + if ($cmd =~ /^r/) { # child ready + my $pid = unpack 'V', substr $cmd, 1; + $childs{$pid} = 2 if $childs{$pid}; + $err = 0; + } + + # TODO: Socket passing thing for autoreloading childs + } } sub _spawn { - return if $run_config && !@_; # already checked if we need to spawn + state %c = ( + listen_sock => undef, + client_sock => undef, + supervisor_sock => undef, + init => 0, + ); + return if $c{init} && !@_; # already checked if we need to spawn - $run_config = $_[0] || do { - my %c = ( - http => $ENV{FU_HTTP} // '127.0.0.1:3000', - fcgi => $ENV{FU_FCGI}, - proc => $ENV{FU_PROC} // 1, - monitor => $ENV{FU_MONITOR}, - max_reqs => $ENV{FU_MAX_REQS}, - ); + %c = (%c, @_, init => 1) if @_ && defined $_[0]; + if (!$c{init}++) { + $c{http} = $ENV{FU_HTTP} // '127.0.0.1:3000'; + $c{fcgi} = $ENV{FU_FCGI}; + $c{proc} = $ENV{FU_PROC} // 1; + $c{monitor} = $ENV{FU_MONITOR}; + $c{max_reqs} = $ENV{FU_MAX_REQS}; debug = 1 if $ENV{FU_DEBUG}; + for (@ARGV) { $c{http} = $1 if /^--http=(.+)$/; $c{fcgi} = $1 if /^--fcgi=(.+)$/; @@ -288,36 +364,47 @@ sub _spawn { $c{max_reqs} = $1 if /^--max-reqs=([0-9]+)$/; debug = 1 if /^--debug$/; debug = 0 if /^--no-debug$/; + $c{listen_sock} = IO::Socket->new_from_fd($1, 'r') if /^--listen-fd=([0-9]+)$/; + $c{client_sock} = IO::Handle->new_from_fd($1, 'r+') if /^--client-fd=([0-9]+)$/; + $c{supervisor_sock} = IO::Handle->new_from_fd($1, 'w') if /^--supervisor-fd=([0-9]+)$/; } - \%c; }; # Single process, no need for a supervisor - my $need_supervisor = $run_config->{proc} > 1 || $run_config->{monitor} || $run_config->{max_reqs}; + my $need_supervisor = !$c{supervisor_sock} && !$c{client_sock} + && ($c{proc} > 1 || $c{monitor} || $c{max_reqs}); return if !@_ && !$need_supervisor; - require IO::Socket; - my $addr = $run_config->{fcgi} || $run_config->{http}; - my $listen = IO::Socket->new( - Listen => 5, - Type => IO::Socket::SOCK_STREAM(), - $addr =~ m{^(unix:|/)(.+)$} ? do { - my $path = ($1 eq '/' ? '/' : '').$2; - unlink $path if -S $path; - +(Domain => IO::Socket::AF_UNIX(), Local => $path) - } : ( - Domain => IO::Socket::AF_INET(), - ReuseAddr => 1, - Proto => 'tcp', - LocalAddr => $addr, - ) - ) or die "Unable to create listen socket: $!\n"; + if (!$c{listen_sock}) { + my $addr = $c{fcgi} || $c{http}; + $c{listen_sock} = IO::Socket->new( + Listen => 5, + Type => IO::Socket::SOCK_STREAM(), + $addr =~ m{^(unix:|/)(.+)$} ? do { + my $path = ($1 eq '/' ? '/' : '').$2; + unlink $path if -S $path; + +(Domain => IO::Socket::AF_UNIX(), Local => $path) + } : ( + Domain => IO::Socket::AF_INET(), + ReuseAddr => 1, + Proto => 'tcp', + LocalAddr => $addr, + ) + ) or die "Unable to create listen socket: $!\n"; + print "Listening on $addr\n" if debug; + } - # TODO: Spawn supervisor - print "Listening on $addr\n" if debug; - - while (my $sock = $listen->accept) { - _do_req $sock; + if ($need_supervisor) { + _supervisor \%c; + } else { + $c{supervisor_sock}->syswrite('r'.pack 'V', $$) if $c{supervisor_sock}; + my $stop = 0; + local $SIG{HUP} = 'IGNORE'; + local $SIG{TERM} = $SIG{INT} = sub { $stop = 1 }; + _do_req \%c if $c{client_sock}; + while (!$stop) { + _do_req \%c if ($c{client_sock} = $c{listen_sock}->accept); + } } } @@ -449,7 +536,7 @@ sub _flush($, $sock) { } $sock->print("\r\n"); - $sock->print($r->{resbody}) if fu->method ne 'HEAD' && $r->{status} != 204; + $sock->print($r->{resbody}) if (fu->method//'') ne 'HEAD' && $r->{status} != 204; $sock->flush; }