My original solution for https://superuser.com/questions/482953/read-non-blocking-from-multiple-fifos-in-parallel?answertab=oldest#tab-top saves a copy of the data on disk.
Now I made a second version that buffers a string in memory.
It works, but it needs to have all the fifs connected before it starts. It works:
window1$ mkfifo {1..100}
window1$ parcat {1..100} | pv >/dev/null
window2$ parallel -j0 'cat bigfile > ' ::: *
This gives no output (because 100 is not connected):
window1$ mkfifo {1..100}
window1$ parcat {1..100} | pv >/dev/null
window2$ parallel -j0 'cat bigfile > ' ::: {1..99}
I tried to use open '+<'. This solves the above problem, but now it does not stop at EOF.
How to do it?
Minimum version (does not support large lines and does not require delay):
use Symbol qw(gensym);
use IPC::Open3;
use POSIX qw(:errno_h);
use Fcntl qw(:DEFAULT :flock);
for (@ARGV) {
open($fh{$_},"<",$_) || die;
my $flags;
fcntl($fh{$_}, &F_GETFL, $flags) || die $!;
$flags |= &O_NONBLOCK;
fcntl($fh{$_}, &F_SETFL, $flags) || die $!;
}
while(keys %fh) {
for(keys %fh) {
my($string,$something_read) = non_blocking_read($_);
print $string;
}
select(undef, undef, undef, 1/1000);
}
{
my %buffer;
sub non_blocking_read {
my $file = shift;
my $in = $fh{$file};
my $rv = sysread($in, substr($buffer{$file},length $buffer{$file}), 327680);
if (!$rv) {
if($! == EAGAIN) {
return(undef,undef);
} else {
close $in;
delete $fh{$file};
my $buf = $buffer{$file};
delete $buffer{$file};
return ($buf,1);
}
}
my $i = (rindex($buffer{$file},"\n")+1);
if($i) {
return(substr($buffer{$file},0,$i),
1,substr($buffer{$file},0,$i) = "");
} else {
return("",1);
}
}
}
Full version: important code is in the first 40 lines: the rest is well-tested code.
use Symbol qw(gensym);
use IPC::Open3;
for (@ARGV) {
open($fh{$_},"<",$_) || die;
set_fh_non_blocking($fh{$_});
}
$ms = 1;
while(keys %fh) {
for(keys %fh) {
my($string,$something_read) = non_blocking_read($_);
if($something_read) {
$ms = 0.1;
print $string;
}
}
$ms = exp_usleep($ms);
}
{
my %buffer;
my $ms;
sub non_blocking_read {
use POSIX qw(:errno_h);
my $file = shift;
my $in = $fh{$file};
my $rv = read($in, substr($buffer{$file},length $buffer{$file}), 327680);
if (!$rv) {
if($! == EAGAIN) {
return(undef,undef);
} else {
close $in;
delete $fh{$file};
my $buf = $buffer{$file};
delete $buffer{$file};
return ($buf,1);
}
}
my $i = (::rindex64(\$buffer{$file},"\n")+1) ||
(::rindex64(\$buffer{$file},"\r")+1);
if($i) {
return(substr($buffer{$file},0,$i),
1,substr($buffer{$file},0,$i) = "");
} else {
return("",1);
}
}
}
sub rindex64 {
my $ref = shift;
my $match = shift;
my $pos = shift;
my $block_size = 2**31-1;
my $strlen = length($$ref);
$pos = defined $pos ? $pos : $strlen;
if($strlen < $block_size) {
return rindex($$ref, $match, $pos);
}
my $matchlen = length($match);
my $ret;
my $offset = $pos - $block_size + $matchlen;
if($offset < 0) {
$block_size = $block_size + $offset;
$offset = 0;
}
while($offset >= 0) {
$ret = rindex(
substr($$ref, $offset, $block_size),
$match);
if($ret != -1) {
return $ret + $offset;
}
$offset -= ($block_size - $matchlen - 1);
}
return -1;
}
sub exp_usleep {
my $ms = shift;
select(undef, undef, undef, $ms/1000);
return (($ms < 1000) ? ($ms * 1.1) : ($ms));
}
sub set_fh_non_blocking {
my $fh = shift;
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
my $flags;
fcntl($fh, &F_GETFL, $flags) || die $!;
$flags |= &O_NONBLOCK;
fcntl($fh, &F_SETFL, $flags) || die $!;
}