This file is indexed.

/usr/lib/python2.7/dist-packages/VirtualMailManager/common.py is in vmm 0.6.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# -*- coding: UTF-8 -*-
# Copyright (c) 2010 - 2014, Pascal Volk
# See COPYING for distribution information.
"""
    VirtualMailManager.common
    ~~~~~~~~~~~~~~~~~~~~~~~~~

    Some common functions
"""

import locale
import os
import re
import stat

from VirtualMailManager import ENCODING
from VirtualMailManager.constants import INVALID_MAIL_LOCATION, \
     NOT_EXECUTABLE, NO_SUCH_BINARY, TYPE_ACCOUNT, TYPE_ALIAS, TYPE_RELOCATED
from VirtualMailManager.errors import VMMError

VERSION_RE = re.compile(r'^(\d+)\.(\d+)\.(?:(\d+)|(alpha|beta|rc)(\d+))$')

_version_level = dict(alpha=0xA, beta=0xB, rc=0xC)
_version_cache = {}
_ = lambda msg: msg


def expand_path(path):
    """Expands paths, starting with ``.`` or ``~``, to an absolute path."""
    if path.startswith('.'):
        return os.path.abspath(path)
    if path.startswith('~'):
        return os.path.expanduser(path)
    return path


def get_unicode(string):
    """Converts `string` to `unicode`, if necessary."""
    if isinstance(string, unicode):
        return string
    return unicode(string, ENCODING, 'replace')


def lisdir(path):
    """Checks if `path` is a directory.  Doesn't follow symbolic links.
    Returns bool.
    """
    try:
        lstat = os.lstat(path)
    except OSError:
        return False
    return stat.S_ISDIR(lstat.st_mode)


def exec_ok(binary):
    """Checks if the `binary` exists and if it is executable.

    Throws a `VMMError` if the `binary` isn't a file or is not
    executable.
    """
    binary = expand_path(binary)
    if not os.path.isfile(binary):
        raise VMMError(_(u"No such file: '%s'") % get_unicode(binary),
                       NO_SUCH_BINARY)
    if not os.access(binary, os.X_OK):
        raise VMMError(_(u"File is not executable: '%s'") %
                       get_unicode(binary), NOT_EXECUTABLE)
    return binary


def human_size(size):
    """Converts the `size` in bytes in human readable format."""
    if not isinstance(size, (long, int)):
        try:
            size = long(size)
        except ValueError:
            raise TypeError("'size' must be a positive long or int.")
    if size < 0:
        raise ValueError("'size' must be a positive long or int.")
    if size < 1024:
        return str(size)
    # TP: abbreviations of gibibyte, tebibyte kibibyte and mebibyte
    prefix_multiply = ((_(u'TiB'), 1 << 40), (_(u'GiB'), 1 << 30),
                       (_(u'MiB'), 1 << 20), (_(u'KiB'), 1 << 10))
    for prefix, multiply in prefix_multiply:
        if size >= multiply:
            # TP: e.g.: '%(size)s %(prefix)s' -> '118.30 MiB'
            return _(u'%(size)s %(prefix)s') % {
                    'size': locale.format('%.2f', float(size) / multiply,
                                          True).decode(ENCODING, 'replace'),
                    'prefix': prefix}


def size_in_bytes(size):
    """Converts the string `size` to a long (size in bytes).

    The string `size` can be suffixed with *b* (bytes), *k* (kilobytes),
    *M* (megabytes) or *G* (gigabytes).
    """
    if not isinstance(size, basestring) or not size:
        raise TypeError('size must be a non empty string.')
    if size[-1].upper() in ('B', 'K', 'M', 'G'):
        try:
            num = int(size[:-1])
        except ValueError:
            raise ValueError('Not a valid integer value: %r' % size[:-1])
        unit = size[-1].upper()
        if unit == 'B':
            return num
        elif unit == 'K':
            return num << 10L
        elif unit == 'M':
            return num << 20L
        else:
            return num << 30L
    else:
        try:
            num = int(size)
        except ValueError:
            raise ValueError('Not a valid size value: %r' % size)
        return num


def validate_transport(transport, maillocation):
    """Checks if the `transport` is usable for the given `maillocation`.

    Throws a `VMMError` if the chosen `transport` is unable to write
    messages in the `maillocation`'s mailbox format.

    Arguments:

    `transport` : VirtualMailManager.transport.Transport
      a Transport object
    `maillocation` : VirtualMailManager.maillocation.MailLocation
      a MailLocation object
    """
    if transport.transport in ('virtual', 'virtual:') and \
      not maillocation.postfix:
        raise VMMError(_(u"Invalid transport '%(transport)s' for mailbox "
                         u"format '%(mbfmt)s'.") %
                       {'transport': transport.transport,
                        'mbfmt': maillocation.mbformat}, INVALID_MAIL_LOCATION)


def version_hex(version_string):
    """Converts a Dovecot version, e.g.: '1.2.3' or '2.0.beta4', to an int.
    Raises a `ValueError` if the *version_string* has the wrong™ format.

    version_hex('1.2.3') -> 270548736
    hex(version_hex('1.2.3')) -> '0x10203f00'
    """
    global _version_cache
    if version_string in _version_cache:
        return _version_cache[version_string]
    version = 0
    version_mo = VERSION_RE.match(version_string)
    if not version_mo:
        raise ValueError('Invalid version string: %r' % version_string)
    major, minor, patch, level, serial = version_mo.groups()
    major = int(major)
    minor = int(minor)
    if patch:
        patch = int(patch)
    if serial:
        serial = int(serial)

    if major > 0xFF or minor > 0xFF or \
      patch and patch > 0xFF or serial and serial > 0xFF:
        raise ValueError('Invalid version string: %r' % version_string)

    version += major << 28
    version += minor << 20
    if patch:
        version += patch << 12
    version += _version_level.get(level, 0xF) << 8
    if serial:
        version += serial

    _version_cache[version_string] = version
    return version


def version_str(version):
    """Converts a Dovecot version previously converted with version_hex back to
    a string.
    Raises a `TypeError` if *version* is not an int/long.
    Raises a `ValueError` if *version* is an incorrect int version.
    """
    global _version_cache
    if version in _version_cache:
        return _version_cache[version]
    if not isinstance(version, (int, long)):
        raise TypeError('Argument is not a int/long: %r', version)
    major = (version >> 28) & 0xFF
    minor = (version >> 20) & 0xFF
    patch = (version >> 12) & 0xFF
    level = (version >> 8) & 0x0F
    serial = version & 0xFF

    levels = dict(zip(_version_level.values(), _version_level.keys()))
    if level == 0xF and not serial:
        version_string = '%u.%u.%u' % (major, minor, patch)
    elif level in levels and not patch:
        version_string = '%u.%u.%s%u' % (major, minor, levels[level], serial)
    else:
        raise ValueError('Invalid version: %r' % hex(version))

    _version_cache[version] = version_string
    return version_string


def format_domain_default(domaindata):
    """Format info output when the value displayed is the domain default."""
    # TP: [domain default] indicates that a user's setting is the same as
    # configured in the user's domain.
    # e.g.: [  0.84%] 42/5,000 [domain default]
    return _(u'%s [domain default]') % domaindata


def search_addresses(dbh, typelimit=None, lpattern=None, llike=False,
                     dpattern=None, dlike=False):
    """'Search' for addresses by *pattern* in the database.

    The search is limited by *typelimit*, a bitfield with values TYPE_ACCOUNT,
    TYPE_ALIAS, TYPE_RELOCATED, or a bitwise OR thereof. If no limit is
    specified, all types will be searched.

    *lpattern* may be a local part or a partial local part - starting and/or
    ending with a '%' sign.  When the *lpattern* starts or ends with a '%' sign
    *llike* has to be `True` to perform a wildcard search. To retrieve all
    available addresses use the arguments' default values.

    *dpattern* and *dlike* behave analogously for the domain part of an
    address, allowing for separate pattern matching: testuser%@example.%

    The return value of this function is a tuple. The first element is a list
    of domain IDs sorted alphabetically by the corresponding domain names. The
    second element is a dictionary indexed by domain ID, holding lists to
    associated addresses. Each address is itself actually a tuple of address,
    type, and boolean indicating whether the address stems from an alias
    domain.
    """
    if typelimit is None:
            typelimit = TYPE_ACCOUNT | TYPE_ALIAS | TYPE_RELOCATED
    queries = []
    if typelimit & TYPE_ACCOUNT:
        queries.append('SELECT gid, local_part, %d AS type FROM users'
                       % TYPE_ACCOUNT)
    if typelimit & TYPE_ALIAS:
        queries.append('SELECT DISTINCT gid, address as local_part, '
                       '%d AS type FROM alias' % TYPE_ALIAS)
    if typelimit & TYPE_RELOCATED:
        queries.append('SELECT gid, address as local_part, %d AS type '
                       'FROM relocated' % TYPE_RELOCATED)
    sql = "SELECT gid, local_part || '@' || domainname AS address, "
    sql += 'type, NOT is_primary AS from_aliasdomain FROM ('
    sql += ' UNION '.join(queries)
    sql += ') a JOIN domain_name USING (gid)'
    nextkw = 'WHERE'
    sqlargs = []
    for like, field, pattern in ((dlike, 'domainname', dpattern),
                                 (llike, 'local_part', lpattern)):
        if like:
            match = 'LIKE'
        else:
            if not pattern:
                continue
            match = '='
        sql += ' %s %s %s %%s' % (nextkw, field, match)
        sqlargs.append(pattern)
        nextkw = 'AND'
    sql += ' ORDER BY domainname, local_part'
    dbc = dbh.cursor()
    dbc.execute(sql, sqlargs)
    result = dbc.fetchall()
    dbc.close()

    gids = []
    daddrs = {}
    for gid, address, addrtype, aliasdomain in result:
        if gid not in daddrs:
            gids.append(gid)
            daddrs[gid] = []
        daddrs[gid].append((address, addrtype, aliasdomain))
    return gids, daddrs

del _