/usr/share/perl5/Message/Passing/Input/ZeroMQ.pm is in libmessage-passing-zeromq-perl 0.007-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | package Message::Passing::Input::ZeroMQ;
use Moo;
use ZeroMQ qw/:all/;
use AnyEvent;
use Scalar::Util qw/ weaken /;
use Try::Tiny qw/ try catch /;
use namespace::clean -except => 'meta';
with qw/
Message::Passing::ZeroMQ::Role::HasASocket
Message::Passing::Role::Input
/;
has '+_socket' => (
handles => {
_zmq_recv => 'recv',
},
);
sub _socket_type { 'SUB' }
sub _build_socket_hwm { 100000 }
sub _build_socket_swap { 0 }
has subscribe => (
isa => sub { ref($_[0]) eq 'ARRAY' },
is => 'ro',
lazy => 1,
default => sub { [ '' ] }, # Subscribe to everything!
);
after setsockopt => sub {
my ($self, $socket) = @_;
if ($self->socket_type eq 'SUB') {
foreach my $sub (@{ $self->subscribe }) {
$socket->setsockopt(ZMQ_SUBSCRIBE, $sub);
}
}
};
sub _try_rx {
my $self = shift();
my $msg = $self->_zmq_recv(ZMQ_NOBLOCK);
if ($msg) {
$self->output_to->consume($msg->data);
}
return $msg;
}
has _io_reader => (
is => 'ro',
lazy => 1,
default => sub {
my $weak_self = shift;
weaken($weak_self);
AE::io $weak_self->_socket->getsockopt( ZMQ_FD ), 0,
sub { my $more; do { $more = $weak_self->_try_rx } while ($more) };
},
);
# Note that we need this timer as ZMQ is magic..
# Just checking our local FD for readability will not always
# be enough, as the client end of ZQM may not start pushing messages to us,
# ergo we call ->recv explicitly on the socket to get messages
# which may be pre-buffered at a client as fast as possible (i.e. before
# the client pushes another message).
has _zmq_timer => (
is => 'ro',
lazy => 1,
default => sub {
my $weak_self = shift;
weaken($weak_self);
AnyEvent->timer(after => 1, interval => 1,
cb => sub { my $more; do { $more = $weak_self->_try_rx } while ($more) });
},
);
sub BUILD {
my $self = shift;
$self->_io_reader;
$self->_zmq_timer;
}
1;
=head1 NAME
Message::Passing::Input::ZeroMQ - input messages from ZeroMQ.
=head1 SYNOPSIS
message-passing --output STDOUT --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5552"}'
=head1 DESCRIPTION
A L<Message::Passing> ZeroMQ input class.
Can be used as part of a chain of classes with the L<message-passing> utility, or directly as
an input with L<Message::Passing::DSL>.
=head1 ATTRIBUTES
See L<Message::Passing::ZeroMQ/CONNECTION ATTRIBUTES>
=head2 subscribe
If the input socket is a C<SUB> socket, then the C<ZMQ_SUBSCRIBE>
socket option will be set once for each value in the subscribe attribute.
Defaults to '', which means all messages are subscribed to.
=head1 SEE ALSO
=over
=item L<Message::Passing::ZeroMQ>
=item L<Message::Passing::Output::ZeroMQ>
=item L<Message::Passing>
=item L<ZeroMQ>
=item L<http://www.zeromq.org/>
=back
=head1 SPONSORSHIP
This module exists due to the wonderful people at Suretec Systems Ltd.
<http://www.suretecsystems.com/> who sponsored its development for its
VoIP division called SureVoIP <http://www.surevoip.co.uk/> for use with
the SureVoIP API -
<http://www.surevoip.co.uk/support/wiki/api_documentation>
=head1 AUTHOR, COPYRIGHT AND LICENSE
See L<Message::Passing::ZeroMQ>.
=cut
|