/usr/lib/ruby/vendor_ruby/raindrops/aggregate/pmq.rb is in ruby-raindrops 0.19.0-1build2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 | # -*- encoding: binary -*-
require "tempfile"
require "aggregate"
require "posix_mq"
require "fcntl"
require "thread"
require "stringio"
# \Aggregate + POSIX message queues support for Ruby 1.9+ and \Linux
#
# This class is duck-type compatible with \Aggregate and allows us to
# aggregate and share statistics from multiple processes/threads aided
# POSIX message queues. This is designed to be used with the
# Raindrops::LastDataRecv Rack application, but can be used independently
# on compatible Runtimes.
#
# Unlike the core of raindrops, this is only supported on Ruby 1.9+ and
# Linux 2.6+. Using this class requires the following additional RubyGems
# or libraries:
#
# * aggregate (tested with 0.2.2)
# * posix_mq (tested with 1.0.0)
#
# == Design
#
# There is one master thread which aggregates statistics. Individual
# worker processes or threads will write to a shared POSIX message
# queue (default: "/raindrops") that the master reads from. At a
# predefined interval, the master thread will write out to a shared,
# anonymous temporary file that workers may read from
#
# Setting +:worker_interval+ and +:master_interval+ to +1+ will result
# in perfect accuracy but at the cost of a high synchronization
# overhead. Larger intervals mean less frequent messaging for higher
# performance but lower accuracy.
class Raindrops::Aggregate::PMQ
# :stopdoc:
# These constants are for Linux. This is designed for aggregating
# TCP_INFO.
RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256".freeze).freeze
WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256".freeze).freeze
UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256".freeze).freeze
# :startdoc:
# returns the number of dropped messages sent to a POSIX message
# queue if non-blocking operation was desired with :lossy
attr_reader :nr_dropped
#
# Creates a new Raindrops::Aggregate::PMQ object
#
# Raindrops::Aggregate::PMQ.new(options = {}) -> aggregate
#
# +options+ is a hash that accepts the following keys:
#
# * :queue - name of the POSIX message queue (default: "/raindrops")
# * :worker_interval - interval to send to the master (default: 10)
# * :master_interval - interval to for the master to write out (default: 5)
# * :lossy - workers drop packets if master cannot keep up (default: false)
# * :aggregate - \Aggregate object (default: \Aggregate.new)
# * :mq_umask - umask for creatingthe POSIX message queue (default: 0666)
#
def initialize(params = {})
opts = {
:queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
:worker_interval => 10,
:master_interval => 5,
:lossy => false,
:mq_attr => nil,
:mq_umask => 0666,
:aggregate => Aggregate.new,
}.merge! params
@master_interval = opts[:master_interval]
@worker_interval = opts[:worker_interval]
@aggregate = opts[:aggregate]
@worker_queue = @worker_interval ? [] : nil
@mutex = Mutex.new
@mq_name = opts[:queue]
mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
Tempfile.open("raindrops_pmq") do |t|
@wr = File.open(t.path, "wb")
@rd = File.open(t.path, "rb")
end
@wr.sync = true
@cached_aggregate = @aggregate
flush_master
@mq_send = if opts[:lossy]
@nr_dropped = 0
mq.nonblock = true
mq.method :trysend
else
mq.method :send
end
end
# adds a sample to the underlying \Aggregate object
def << val
if q = @worker_queue
q << val
if q.size >= @worker_interval
mq_send(q) or @nr_dropped += 1
q.clear
end
else
mq_send(val) or @nr_dropped += 1
end
end
def mq_send(val) # :nodoc:
@cached_aggregate = nil
@mq_send.call Marshal.dump(val)
end
#
# Starts running a master loop, usually in a dedicated thread or process:
#
# Thread.new { agg.master_loop }
#
# Any worker can call +agg.stop_master_loop+ to stop the master loop
# (possibly causing the thread or process to exit)
def master_loop
buf = ""
a = @aggregate
nr = 0
mq = POSIX_MQ.new @mq_name, :r # this one is always blocking
begin
if (nr -= 1) < 0
nr = @master_interval
flush_master
end
mq.shift(buf)
data = begin
Marshal.load(buf) or return
rescue ArgumentError, TypeError
next
end
Array === data ? data.each { |x| a << x } : a << data
rescue Errno::EINTR
rescue => e
warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
break
end while true
ensure
flush_master
end
# Loads the last shared \Aggregate from the master thread/process
def aggregate
@cached_aggregate ||= begin
flush
Marshal.load(synchronize(@rd, RDLOCK) do |rd|
dst = StringIO.new
dst.binmode
IO.copy_stream(rd, dst, rd.size, 0)
dst.string
end)
end
end
# Flushes the currently aggregate statistics to a temporary file.
# There is no need to call this explicitly as +:worker_interval+ defines
# how frequently your data will be flushed for workers to read.
def flush_master
dump = Marshal.dump @aggregate
synchronize(@wr, WRLOCK) do |wr|
wr.truncate 0
wr.rewind
wr.write(dump)
end
end
# stops the currently running master loop, may be called from any
# worker thread or process
def stop_master_loop
sleep 0.1 until mq_send(false)
rescue Errno::EINTR
retry
end
def lock! io, type # :nodoc:
io.fcntl Fcntl::F_SETLKW, type
rescue Errno::EINTR
retry
end
# we use both a mutex for thread-safety and fcntl lock for process-safety
def synchronize io, type # :nodoc:
@mutex.synchronize do
begin
type = type.dup
lock! io, type
yield io
ensure
lock! io, type.replace(UNLOCK)
type.clear
end
end
end
# flushes the local queue of the worker process, sending all pending
# data to the master. There is no need to call this explicitly as
# +:worker_interval+ defines how frequently your queue will be flushed
def flush
if q = @local_queue && ! q.empty?
mq_send q
q.clear
end
nil
end
# proxy for \Aggregate#count
def count; aggregate.count; end
# proxy for \Aggregate#max
def max; aggregate.max; end
# proxy for \Aggregate#min
def min; aggregate.min; end
# proxy for \Aggregate#sum
def sum; aggregate.sum; end
# proxy for \Aggregate#mean
def mean; aggregate.mean; end
# proxy for \Aggregate#std_dev
def std_dev; aggregate.std_dev; end
# proxy for \Aggregate#outliers_low
def outliers_low; aggregate.outliers_low; end
# proxy for \Aggregate#outliers_high
def outliers_high; aggregate.outliers_high; end
# proxy for \Aggregate#to_s
def to_s(*args); aggregate.to_s(*args); end
# proxy for \Aggregate#each
def each; aggregate.each { |*args| yield(*args) }; end
# proxy for \Aggregate#each_nonzero
def each_nonzero; aggregate.each_nonzero { |*args| yield(*args) }; end
end
|