# ==================================================================
# Gossamer Threads Module Library - http://gossamer-threads.com/
#
#   GT::IPC::Run::Unix
#   Author  : Scott Beck
#   $Id: Unix.pm,v 1.24 2004/02/17 01:33:07 jagerman Exp $
# 
# Copyright (c) 2004 Gossamer Threads Inc.  All Rights Reserved.
# ==================================================================

package GT::IPC::Run::Unix;

use strict;
use vars qw/$EVENTS $ERROR_MESSAGE/;
use base 'GT::Base';

use IO::Select;
use POSIX qw(fcntl_h errno_h :sys_wait_h);

sub READ_BLOCK () { 512 }

@GT::IPC::Run::Unix::ISA = qw(GT::IPC::Run);
$ERROR_MESSAGE = 'GT::IPC::Run';

sub execute {
# ------------------------------------------------------------------------
    my ($self) = @_;

#    unless ($self->{sigchld_installed}) {
#        $self->{chld_signal} = sub {
#            my $child;
#            while (($child = waitpid -1, WNOHANG) > 0) {
#                $self->{goners}{$child} = $?;
#                $self->debug(
#                    "forked child $child exited with exit status (".
#                    ($self->{goners}{$child} >> 8).
#                    ")\n"
#                ) if $self->{_debug};
#            }
#            $SIG{CHLD} = $self->{chld_signal};
#        };
#        $SIG{CHLD} = $self->{chld_signal};
#        $self->{sigchld_installed} = 1;
#    }

# Create a semaphore pipe.  This is used so that the parent doesn't
# begin listening until the child's stdio has been set up.
    my ($child_pipe_read, $child_pipe_write) = $self->oneway;
    die "Could not create semaphore socket: $!" unless defined $child_pipe_read;

    my $pid;
    if ($pid = fork) { # Parent
        my $child = delete $self->{current_child};
        $self->{select}->add_stdout($pid => $child->stdout_read);
        $self->{select}->add_stderr($pid => $child->stderr_read);
        $self->{children}{$pid} = $child;
        $child->pid($pid);
        if ($child->stdin and ref($child->stdin) eq 'SCALAR') {
            print {$child->stdin_write} ${$child->stdin};
            close $child->stdin_write;
        }

        # Cludge to stop speed forking
        select undef, undef, undef, 0.001;

        # Close what the parent will not need
#        close $child->stdout_write if $child->stdout_write;
#        close $child->stderr_write if $child->stderr_write;
#        close $child->stdin_read   if $child->stdin_read;
        <$child_pipe_read>;
        close $child_pipe_read;
        close $child_pipe_write;
        return $pid;
    }
    else {
        $self->fatal(FORK => "$!") unless defined $pid;
        $self->debug("Forked: $$\n") if $self->{_debug} > 1;

        # Get out self and out filenos
        my $self = delete $self->{current_child};
        my ($stdout_fn, $stderr_fn, $stdin_fn);
        $stdout_fn = fileno($self->stdout_write) if $self->stdout_write;
        $stderr_fn = fileno($self->stderr_write) if $self->stderr_write;
        $stdin_fn  = fileno($self->stdin_read)   if $self->stdin_read;
        # Close what the child won't need.
#        close $self->stdin_write if $self->stdin_write;
#        close $self->stderr_read if $self->stderr_read;
#        close $self->stdout_read if $self->stdout_read;

# Tied handles break this
        untie *STDOUT if tied *STDOUT;
        untie *STDERR if tied *STDERR;
        untie *STDIN  if tied *STDIN;

        # Redirect STDOUT to the write end of the stdout pipe.
        if (defined $stdout_fn) {
            $self->debug("Opening stdout to fileno $stdout_fn") if $self->{_debug};
            open( STDOUT, ">&$stdout_fn" )
                or die "can't redirect stdout in child pid $$: $!";
            $self->debug("stdout opened") if $self->{_debug};
        }

        # Redirect STDIN from the read end of the stdin pipe.
        if (defined $stdin_fn) {
            $self->debug("Opening stdin to fileno $stdin_fn") if $self->{_debug};
            open( STDIN, "<&$stdin_fn" )
                or die "can't redirect STDIN in child pid $$: $!";
            $self->debug("stdin opened") if $self->{_debug};
        }

        # Redirect STDERR to the write end of the stderr pipe.
        if (defined $stderr_fn) {
            $self->debug("Opening stderr to fileno $stderr_fn") if $self->{_debug};
            open( STDERR, ">&$stderr_fn" )
                or die "can't redirect stderr in child: $!";
        }

        select STDERR;  $| = 1;
        select STDOUT;  $| = 1;

        # Tell the parent that the stdio has been set up.
        close $child_pipe_read;
        print $child_pipe_write "go\n";
        close $child_pipe_write;

        # Program code here
        my $program = $self->program;
        if (ref($program) eq 'ARRAY') {
            exec(@$program) or do {
                print STDERR "can't exec (@$program) in child pid $$:$!\n";
                eval { POSIX::_exit($?); };
                eval { kill KILL => $$; };
            };
        }
        elsif (ref($program) eq 'CODE') {
            $? = 0;
            $program->();

            # In case flushing them wasn't good enough.
            close STDOUT if defined fileno(STDOUT);
            close STDERR if defined fileno(STDERR);

            eval { POSIX::_exit($?); };
            eval { kill KILL => $$; };
        }
        else {
            exec($program) or do {
                print STDERR "can't exec ($program) in child pid $$:$!\n";
                eval { POSIX::_exit($?); };
                eval { kill KILL => $$; };
            };
        }
        die "How did I get here!";
    }
}

sub put {
# ------------------------------------------------------------------------
    my $self = shift;
    my $pid = shift;
    print {$self->{children}{$pid}->stdin_write} @_;
}

sub do_one_loop {
# ------------------------------------------------------------------------
    my ($self, $wait) = @_;
    $wait = 0.05 unless defined $wait;

    # See if any children have exited
    my $child;
    while (($child = waitpid -1, WNOHANG) > 0) {
        next unless exists $self->{children}{$child};
        $self->{goners}{$child} = $?;
        $self->{children}{$child}->exit_status($?);
        $self->debug(
            "forked child $child exited with exit status (".
            ($self->{goners}{$child} >> 8).
            ")\n"
        ) if $self->{_debug};
    }

    for my $pid (keys %{$self->{goners}} ) {
        my $child = $self->{children}{$pid} or next;
        if (!$child->called_reaper) {
            $child->exit_callback->($pid, $self->{goners}{$pid})
                if $child->exit_callback;
            $child->called_reaper(1);
        }
    }
    my ($stdout_pending, $stderr_pending) = $self->{select}->can_read($wait);

    my %not_pending = %{$self->{children}};
    for my $pid (@$stdout_pending, @$stderr_pending) {
        delete $not_pending{$pid};
    }
    for my $pid (keys %{$self->{goners}}) {
        my $child = $self->{children}{$pid} or next;
        if ($not_pending{$pid} and not $child->called_done) {
            $child->done_callback->($pid, $self->{goners}{$pid})
                if $child->done_callback;
            $child->called_done(1);
        }
    }

    if (!@$stdout_pending and !@$stderr_pending) {
        $self->debug("Nothing else to do, flushing buffers")
            if $self->{_debug};
        $self->debug(
            "Children: ".
            keys(%{$self->{children}}).
            "; goners: ".
            keys(%{$self->{goners}})
        ) if $self->{_debug};

        # We still have children out there
        return 1 if keys(%{$self->{children}}) > keys(%{$self->{goners}});

        # Flush output filters and delete children to free memory and FDs
        $self->flush_filters;

        # Nothing left to do
        return 0;
    }
    # else we have stuff to do
    for my $pid (@$stdout_pending) {
        my $child = $self->{children}{$pid};
        $self->debug("STDOUT pending for $pid") if $self->{_debug};

        my $ret = sysread($child->stdout_read, my $buff, READ_BLOCK);
        if (!$ret) {
            if ($! != EAGAIN and $! != 0) {
                # Socket error
                $self->debug(
                    "$pid: Socket Read: $!: $^E; Errno: ", 0+$!
                ) if $self->{_debug};
            }
        }
        else {
            # Process callbacks
            $self->debug("[$pid STDOUT]: `$buff'\n")
                if $self->{_debug} > 1;
            if ($child->handler_stdout) {
                $child->handler_stdout->put(\$buff);
            }
        }
    }
    for my $pid (@$stderr_pending) {
        my $child = $self->{children}{$pid};
        $self->debug("STDERR pending for $pid") if $self->{_debug};

        my $ret = sysread($child->stderr_read, my $buff, READ_BLOCK);
        if (!$ret) {
            if ($! != EAGAIN and $! != 0) {
                # Socket error
                $self->debug(
                    "$pid: Socket Read: $!: $^E; Errno: ", 0+$!
                ) if $self->{_debug};
            }
        }
        else {
            # Process callbacks
            $self->debug("[$pid STDERR]: `$buff'\n")
                if $self->{_debug} > 1;
            if ($child->handler_stderr) {
                $child->handler_stderr->put(\$buff);
            }
        }
    }
    return 1;
}

sub flush_filters {
# ------------------------------------------------------------------------
    my $self = shift;
    for my $pid (keys %{$self->{children}}) {
        my $child = delete $self->{children}{$pid};
        $self->select->remove_stdout($pid);
        $self->select->remove_stderr($pid);
        if ($child->handler_stdout) {
            $child->handler_stdout->flush;
        }
        if ($child->handler_stderr) {
            $child->handler_stderr->flush;
        }
    }
}

sub stop_blocking {
# ------------------------------------------------------------------------
    my ($self, $socket_handle) = @_;
    my $flags = fcntl($socket_handle, F_GETFL, 0) or die "getfl: $!";
    $flags = fcntl($socket_handle, F_SETFL, $flags | O_NONBLOCK)
        or die "setfl: $!";
}

sub start_blocking {
# ------------------------------------------------------------------------
    my ($self, $socket_handle) = @_;
    my $flags = fcntl($socket_handle, F_GETFL, 0) or die "getfl: $!";
    $flags = fcntl($socket_handle, F_SETFL, $flags & ~O_NONBLOCK)
        or die "setfl: $!";
}


1;
