This file is indexed.

/usr/share/perl5/Message/Passing/Input/ZeroMQ.pm is in libmessage-passing-zeromq-perl 0.007-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package Message::Passing::Input::ZeroMQ;
use Moo;
use ZeroMQ qw/:all/;
use AnyEvent;
use Scalar::Util qw/ weaken /;
use Try::Tiny qw/ try catch /;
use namespace::clean -except => 'meta';

with qw/
    Message::Passing::ZeroMQ::Role::HasASocket
    Message::Passing::Role::Input
/;

has '+_socket' => (
    handles => {
        _zmq_recv => 'recv',
    },
);

sub _socket_type { 'SUB' }

sub _build_socket_hwm { 100000 }
sub _build_socket_swap { 0 }

has subscribe => (
    isa => sub { ref($_[0]) eq 'ARRAY' },
    is => 'ro',
    lazy => 1,
    default => sub { [ '' ] }, # Subscribe to everything!
);

after setsockopt => sub {
    my ($self, $socket) = @_;
    if ($self->socket_type eq 'SUB') {
        foreach my $sub (@{ $self->subscribe }) {
            $socket->setsockopt(ZMQ_SUBSCRIBE, $sub);
        }
    }
};

sub _try_rx {
    my $self = shift();
    my $msg = $self->_zmq_recv(ZMQ_NOBLOCK);
    if ($msg) {
        $self->output_to->consume($msg->data);
    }
    return $msg;
}

has _io_reader => (
    is => 'ro',
    lazy => 1,
    default => sub {
        my $weak_self = shift;
        weaken($weak_self);
        AE::io $weak_self->_socket->getsockopt( ZMQ_FD ), 0,
            sub { my $more; do { $more = $weak_self->_try_rx } while ($more) };
    },
);

# Note that we need this timer as ZMQ is magic..
# Just checking our local FD for readability will not always
# be enough, as the client end of ZQM may not start pushing messages to us,
# ergo we call ->recv explicitly on the socket to get messages
# which may be pre-buffered at a client as fast as possible (i.e. before
# the client pushes another message).
has _zmq_timer => (
    is => 'ro',
    lazy => 1,
    default => sub {
        my $weak_self = shift;
        weaken($weak_self);
        AnyEvent->timer(after => 1, interval => 1,
            cb => sub { my $more; do { $more = $weak_self->_try_rx } while ($more) });
    },
);

sub BUILD {
    my $self = shift;
    $self->_io_reader;
    $self->_zmq_timer;
}

1;

=head1 NAME

Message::Passing::Input::ZeroMQ - input messages from ZeroMQ.

=head1 SYNOPSIS

    message-passing --output STDOUT --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5552"}'

=head1 DESCRIPTION

A L<Message::Passing> ZeroMQ input class.

Can be used as part of a chain of classes with the L<message-passing> utility, or directly as
an input with L<Message::Passing::DSL>.

=head1 ATTRIBUTES

See L<Message::Passing::ZeroMQ/CONNECTION ATTRIBUTES>

=head2 subscribe

If the input socket is a C<SUB> socket, then the C<ZMQ_SUBSCRIBE>
socket option will be set once for each value in the subscribe attribute.

Defaults to '', which means all messages are subscribed to.

=head1 SEE ALSO

=over

=item L<Message::Passing::ZeroMQ>

=item L<Message::Passing::Output::ZeroMQ>

=item L<Message::Passing>

=item L<ZeroMQ>

=item L<http://www.zeromq.org/>

=back

=head1 SPONSORSHIP

This module exists due to the wonderful people at Suretec Systems Ltd.
<http://www.suretecsystems.com/> who sponsored its development for its
VoIP division called SureVoIP <http://www.surevoip.co.uk/> for use with
the SureVoIP API - 
<http://www.surevoip.co.uk/support/wiki/api_documentation>

=head1 AUTHOR, COPYRIGHT AND LICENSE

See L<Message::Passing::ZeroMQ>.

=cut