/usr/bin/parcat is in parallel 20161222-1.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 | #!/usr/bin/perl
=head1 NAME
parcat - cat files or fifos in parallel
=head1 SYNOPSIS
B<parcat> file(s)
=head1 DESCRIPTION
GNU B<parcat> reads files or fifos in parallel. It writes full lines
so there will be no problem with mixed-half-lines which you risk if
you use:
(cat file1 & cat file2 &) | ...
=head1 EXAMPLES
=head2 Do be done
mkfifo slot-{1..5}-digit-{0..9}
parallel -j5 'seq 100000 | grep {} > slot-{%}-digit-{}' ::: {0..9} &
parallel parcat slot-{1..5}-digit-{} '>' digit-{} ::: {0..9}
=head1 REPORTING BUGS
GNU B<parcat> is part of GNU B<parallel>. Report bugs to <bug-parallel@gnu.org>.
=head1 AUTHOR
Copyright (C) 2016 Ole Tange, http://ole.tange.dk and Free Software
Foundation, Inc.
=head1 LICENSE
Copyright (C) 2007,2008,2009,2010,2011 Free Software Foundation, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
at your option any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
=head2 Documentation license I
Permission is granted to copy, distribute and/or modify this documentation
under the terms of the GNU Free Documentation License, Version 1.3 or
any later version published by the Free Software Foundation; with no
Invariant Sections, with no Front-Cover Texts, and with no Back-Cover
Texts. A copy of the license is included in the file fdl.txt.
=head2 Documentation license II
You are free:
=over 9
=item B<to Share>
to copy, distribute and transmit the work
=item B<to Remix>
to adapt the work
=back
Under the following conditions:
=over 9
=item B<Attribution>
You must attribute the work in the manner specified by the author or
licensor (but not in any way that suggests that they endorse you or
your use of the work).
=item B<Share Alike>
If you alter, transform, or build upon this work, you may distribute
the resulting work only under the same, similar or a compatible
license.
=back
With the understanding that:
=over 9
=item B<Waiver>
Any of the above conditions can be waived if you get permission from
the copyright holder.
=item B<Public Domain>
Where the work or any of its elements is in the public domain under
applicable law, that status is in no way affected by the license.
=item B<Other Rights>
In no way are any of the following rights affected by the license:
=over 9
=item *
Your fair dealing or fair use rights, or other applicable
copyright exceptions and limitations;
=item *
The author's moral rights;
=item *
Rights other persons may have either in the work itself or in
how the work is used, such as publicity or privacy rights.
=back
=item B<Notice>
For any reuse or distribution, you must make clear to others the
license terms of this work.
=back
A copy of the full license is included in the file as cc-by-sa.txt.
=head1 DEPENDENCIES
GNU B<parcat> uses Perl.
=head1 SEE ALSO
B<cat>(1), B<parallel>(1)
=cut
use Symbol qw(gensym);
use IPC::Open3;
use POSIX qw(:errno_h);
use IO::Select;
use strict;
use threads;
use threads::shared;
use Thread::Queue;
my $opened :shared;
my $q = Thread::Queue->new();
my $okq = Thread::Queue->new();
my @producers;
if(not @ARGV) {
print "Usage:\n";
print " parcat file(s)\n";
}
for (@ARGV) {
push @producers, threads->create('producer', $_);
}
sub producer {
# Open a file/fifo, set non blocking, enqueue fileno of the file handle
my $file = shift;
open(my $fh, "<", $file) || do {
print STDERR "parcat: Cannot open $file\n";
exit(1);
};
set_fh_non_blocking($fh);
$q->enqueue(fileno($fh));
$opened++;
# Get an OK that the $fh is opened and we can release the $fh
while(1) {
my $ok = $okq->dequeue();
if($ok == fileno($fh)) { last; }
# Not ours - very unlikely to happen
$okq->enqueue($ok);
}
return;
}
my $s = IO::Select->new();
my %buffer;
sub add_file {
my $fd = shift;
open(my $fh, "<&=", $fd) || die;
$s->add($fh);
# Tell the producer now opened here and can be released
$okq->enqueue($fd);
# Initialize the buffer
@{$buffer{$fh}} = ();
}
sub add_files {
# Non-blocking dequeue
while(defined(my $fd = $q->dequeue_nb())) {
add_file($fd);
}
}
sub add_files_block {
# Blocking dequeue
my $fd = $q->dequeue();
add_file($fd);
}
my $fd;
my (@ready,$file,$rv,$buf);
do {
# Wait until at least one file is opened
add_files_block();
while($q->pending or keys %buffer) {
add_files();
while(keys %buffer) {
@ready = $s->can_read(0.01);
if(not @ready) {
add_files();
}
for $file (@ready) {
$rv = sysread($file, $buf, 65536);
if (!$rv) {
if($! == EAGAIN) {
# Would block: Nothing read
next;
} else {
# This file is done
$s->remove($file);
print @{$buffer{$file}};
delete $buffer{$file};
# Closing the $file causes it to block
# close $file;
add_files();
next;
}
}
# Find \n for full line
my $i = (rindex($buf,"\n")+1);
if($i) {
# Print full line
for(@{$buffer{$file}}, substr($buf,0,$i)) {
syswrite(STDOUT,$_);
}
# @buffer = remaining half line
@{$buffer{$file}} = (substr($buf,$i,$rv-$i));
redo;
} else {
# Something read, but not a full line
push @{$buffer{$file}}, $buf;
redo;
}
}
}
}
} while($opened <= $#ARGV);
for (@producers) {
$_->join();
}
sub set_fh_non_blocking {
# Set filehandle as non-blocking
# Inputs:
# $fh = filehandle to be blocking
# Returns:
# N/A
my $fh = shift;
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
my $flags;
fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= &O_NONBLOCK; # Add non-blocking to the flags
fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}
|