This file is indexed.

/usr/share/pyshared/rgain/rgcalc.py is in python-rgain 1.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# -*- coding: utf-8 -*-
# 
# Copyright (c) 2009-2013 Felix Krull <f_krull@gmx.de>
# 
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.

"""Replay Gain analysis using GStreamer. See ``ReplayGain`` class for full
documentation or use the ``calculate`` function.
"""

import os.path

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst

from rgain import GainData, GSTError, util

# Make sure GObject threads don't crash
GObject.threads_init()

def to_utf8(string):
    if isinstance(string, unicode):
        return string.encode("utf-8")
    else:
        return string.decode("utf-8").encode("utf-8")

class ReplayGain(GObject.GObject):
    
    """Perform a Replay Gain analysis on some files.
    
    This class doesn't actually write any Replay Gain information - that is left
    as an exercise to the user. It only analyzes the files and presents the
    result.
    Basic usage is as follows:
     - instantiate the class, passing it a list of file names and optionally the
       reference loudness level to use (defaults to 89 dB),
     - connect to the signals the class provides,
     - get yourself a glib main loop (e.g. ``GObject.MainLoop`` or the one from
       GTK),
     - call ``replaygain_instance.start()`` to start processing,
     - start your main loop to dispatch events and
     - wait.
    Once you've done that, you can retrieve the data from ``track_data`` (which
    is a dict: keys are file names, values are ``GainData`` instances) and
    ``album_data`` (a 'GainData' instance, even though it may contain only
    ``None`` values if album gain isn't calculated). Note that the values don't
    contain any kind of unit, which might be needed.
    """
    
    __gsignals__ = {
        "all-finished": (GObject.SIGNAL_RUN_LAST, GObject.TYPE_NONE,
            (GObject.TYPE_PYOBJECT, GObject.TYPE_PYOBJECT)),
        "track-started": (GObject.SIGNAL_RUN_LAST, GObject.TYPE_NONE,
            (GObject.TYPE_STRING,)),
        "track-finished": (GObject.SIGNAL_RUN_LAST, GObject.TYPE_NONE,
            (GObject.TYPE_STRING, GObject.TYPE_PYOBJECT)),
        "error": (GObject.SIGNAL_RUN_LAST, GObject.TYPE_NONE,
            (GObject.TYPE_PYOBJECT,)),
    }
    
    
    def __init__(self, files, force=False, ref_lvl=89):
        GObject.GObject.__init__(self)
        self.files = files
        self.force = force
        self.ref_lvl = ref_lvl
        
        self._setup_pipeline()
        self._setup_rg_elem()
        
        self._files_iter = iter(self.files)
        
        # this holds all track gain data
        self.track_data = {}
        self.album_data = GainData(0)
    
    def start(self):
        """Start processing.
        
        For it to work correctly, you'll need to run some GObject main loop
        (e.g. the Gtk one) or process any events manually (though I have no
        idea how or if that works).
        """
        if not self._next_file():
            raise ValueError("do never, ever run this thing without any files")
        self.pipe.set_state(Gst.State.PLAYING)
    
    def pause(self, pause):
        if pause:
            self.pipe.set_state(Gst.State.PAUSED)
        else:
            self.pipe.set_state(Gst.State.PLAYING)
    
    def stop(self):
        self.pipe.set_state(Gst.State.NULL)
    
    
    # internal stuff
    def _setup_pipeline(self):
        """Setup the pipeline."""
        self.pipe = Gst.Pipeline()
        
        # elements
        self.src = Gst.ElementFactory.make("filesrc", "src")
        self.pipe.add(self.src)
        self.decbin = Gst.ElementFactory.make("decodebin", "decbin")
        self.pipe.add(self.decbin)
        self.conv = Gst.ElementFactory.make("audioconvert", "conv")
        self.pipe.add(self.conv)
        self.res = Gst.ElementFactory.make("audioresample", "res")
        self.pipe.add(self.res)
        self.rg = Gst.ElementFactory.make("rganalysis", "rg")
        self.pipe.add(self.rg)
        self.sink = Gst.ElementFactory.make("fakesink", "sink")
        self.pipe.add(self.sink)
        
        # Set num-tracks to the number of files we have to process so they're
        # all treated as one album. Fixes #8.
        self.rg.set_property("num-tracks", len(self.files))
        
        # link
        self.src.link(self.decbin)
        self.conv.link(self.res)
        self.res.link(self.rg)
        self.rg.link(self.sink)
        self.decbin.connect("pad-added", self._on_pad_added)
        self.decbin.connect("pad-removed", self._on_pad_removed)
        
        bus = self.pipe.get_bus()
        bus.add_signal_watch()
        bus.connect("message", self._on_message)
    
    def _setup_rg_elem(self):
        # there's no way to specify 'forced', as it's usually useless
        self.rg.set_property("forced", True)
        self.rg.set_property("reference-level", self.ref_lvl)
    
    def _next_file(self):
        """Load the next file to analyze.
        
        Returns False if everything is done and the pipeline shouldn't be
        started again; True otherwise.
        """
        # get the next file
        try:
            fname = self._files_iter.next()
        except StopIteration:
            self.emit("all-finished", self.track_data, self.album_data)
            return False
        
        # point the source to the new file
        self.src.set_property("location", to_utf8(fname))
        self._current_file = fname
        self.emit("track-started", to_utf8(fname))
        
        return True
    
    def _process_tags(self, msg):
        """Process a tag message."""
        tags = msg.parse_tag()
        trackdata = self.track_data.setdefault(self._current_file, GainData(0))

        def handle_tag(taglist, tag, userdata):
            if tag == Gst.TAG_TRACK_GAIN:
                _, trackdata.gain = taglist.get_double(tag)
            elif tag == Gst.TAG_TRACK_PEAK:
                _, trackdata.peak = taglist.get_double(tag)
            elif tag == Gst.TAG_REFERENCE_LEVEL:
                _, trackdata.ref_level = taglist.get_double(tag)
            
            elif tag == Gst.TAG_ALBUM_GAIN:
                _, self.album_data.gain = taglist.get_double(tag)
            elif tag == Gst.TAG_ALBUM_PEAK:
                _, self.album_data.peak = taglist.get_double(tag)
        
        tags.foreach(handle_tag, None)
    
    
    # event handlers
    def _on_pad_added(self, decbin, new_pad):
        sinkpad = self.conv.get_compatible_pad(new_pad, None)
        if sinkpad is not None:
            new_pad.link(sinkpad)
    
    def _on_pad_removed(self, decbin, old_pad):
        peer = old_pad.get_peer()
        if peer is not None:
            old_pad.unlink(peer)
    
    def _on_message(self, bus, msg):
        if msg.type == Gst.MessageType.TAG:
            self._process_tags(msg)
        elif msg.type == Gst.MessageType.EOS:
            self.emit("track-finished", to_utf8(self._current_file),
                      self.track_data[self._current_file])
            # Preserve rganalysis state
            self.rg.set_locked_state(True)
            self.pipe.set_state(Gst.State.NULL)
            ret = self._next_file()
            if ret:
                self.pipe.set_state(Gst.State.PLAYING)
                # For some reason, GStreamer 1.0's rganalysis element produces
                # an error here unless a flush has been performed.
                pad = self.rg.get_static_pad("src")
                pad.send_event(Gst.Event.new_flush_start())
                pad.send_event(Gst.Event.new_flush_stop(True))
            self.rg.set_locked_state(False)
        elif msg.type == Gst.MessageType.ERROR:
            self.pipe.set_state(Gst.State.NULL)
            err, debug = msg.parse_error()
            self.emit("error", GSTError(err, debug))


def calculate(*args, **kwargs):
    """Analyze some files.
    
    This is only a convenience interface to the ``ReplayGain`` class: it takes
    the same arguments, but setups its own main loop and returns the results
    once everything's finished.
    """
    exc_slot = [None]

    def on_finished(evsrc, trackdata, albumdata):
        # all done
        loop.quit()

    def on_error(evsrc, exc):
        exc_slot[0] = exc
        loop.quit()
    rg = ReplayGain(*args, **kwargs)
    with util.gobject_signals(rg,
        ("all-finished", on_finished),
        ("error", on_error),):
        loop = GObject.MainLoop()
        rg.start()
        loop.run()
    if exc_slot[0] is not None:
        raise exc_slot[0]
    return (rg.track_data, rg.album_data)