/usr/share/perl5/IO/Async/Channel.pm is in libio-async-perl 0.45-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | # You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
# (C) Paul Evans, 2011 -- leonerd@leonerd.org.uk
package IO::Async::Channel;
use strict;
use warnings;
use base qw( IO::Async::Notifier ); # just to get _capture_weakself
our $VERSION = '0.45';
use Carp;
use Storable qw( freeze thaw );
sub new
{
my $class = shift;
return bless {
mode => undef,
}, $class;
}
sub send
{
my $self = shift;
my ( $data ) = @_;
my $record = freeze $data;
$self->send_frozen( $record );
}
sub send_frozen
{
my $self = shift;
my ( $record ) = @_;
my $bytes = pack( "I", length $record ) . $record;
defined $self->{mode} or die "Cannot ->send without being set up";
return $self->_send_sync( $bytes ) if $self->{mode} eq "sync";
return $self->_send_async( $bytes ) if $self->{mode} eq "async";
}
sub close
{
my $self = shift;
return $self->_close_sync if $self->{mode} eq "sync";
return $self->_close_async if $self->{mode} eq "async";
}
sub setup_sync_mode
{
my $self = shift;
( $self->{fh} ) = @_;
$self->{mode} = "sync";
# Since we're communicating binary structures and not Unicode text we need to
# enable binmode
binmode $self->{fh};
$self->{fh}->autoflush(1);
}
sub _read_exactly
{
$_[1] = "";
while( length $_[1] < $_[2] ) {
my $n = read( $_[0], $_[1], $_[2]-length $_[1], length $_[1] );
defined $n or return undef;
$n or return "";
}
return $_[2];
}
sub recv
{
my $self = shift;
$self->{mode} eq "sync" or die "Needs to be in synchronous mode";
my $n = _read_exactly( $self->{fh}, my $lenbuffer, 4 );
defined $n or die "Cannot read - $!";
length $n or return undef;
my $len = unpack( "I", $lenbuffer );
$n = _read_exactly( $self->{fh}, my $record, $len );
defined $n or die "Cannot read - $!";
length $n or return undef;
return thaw $record;
}
sub _send_sync
{
my $self = shift;
my ( $bytes ) = @_;
$self->{fh}->print( $bytes );
}
sub _close_sync
{
my $self = shift;
$self->{fh}->close;
}
sub setup_async_mode
{
my $self = shift;
my %args = @_;
my $stream = delete $args{stream} or croak "Expected 'stream'";
if( my $on_recv = delete $args{on_recv} ) {
$stream->configure( on_read => $self->_capture_weakself( '_on_stream_read' ) );
$self->{on_recv} = $on_recv;
$self->{on_eof} = delete $args{on_eof};
}
keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args );
$self->{stream} = $stream;
$self->{mode} = "async";
$stream->configure( autoflush => 1 );
}
sub _send_async
{
my $self = shift;
my ( $bytes ) = @_;
$self->{stream}->write( $bytes );
}
sub _close_async
{
my $self = shift;
$self->{stream}->close_when_empty;
}
sub _on_stream_read
{
my $self = shift;
my ( $stream, $buffref, $eof ) = @_;
if( $eof ) {
$self->{on_eof}->( $self );
return;
}
return 0 unless length( $$buffref ) >= 4;
my $len = unpack( "I", $$buffref );
return 0 unless length( $$buffref ) >= 4 + $len;
my $record = thaw( substr( $$buffref, 4, $len ) );
substr( $$buffref, 0, 4 + $len ) = "";
$self->{on_recv}->( $self, $record );
return 1;
}
0x55AA;
|