/usr/lib/python2.7/dist-packages/VirtualMailManager/common.py is in vmm 0.6.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 | # -*- coding: UTF-8 -*-
# Copyright (c) 2010 - 2014, Pascal Volk
# See COPYING for distribution information.
"""
VirtualMailManager.common
~~~~~~~~~~~~~~~~~~~~~~~~~
Some common functions
"""
import locale
import os
import re
import stat
from VirtualMailManager import ENCODING
from VirtualMailManager.constants import INVALID_MAIL_LOCATION, \
NOT_EXECUTABLE, NO_SUCH_BINARY, TYPE_ACCOUNT, TYPE_ALIAS, TYPE_RELOCATED
from VirtualMailManager.errors import VMMError
VERSION_RE = re.compile(r'^(\d+)\.(\d+)\.(?:(\d+)|(alpha|beta|rc)(\d+))$')
_version_level = dict(alpha=0xA, beta=0xB, rc=0xC)
_version_cache = {}
_ = lambda msg: msg
def expand_path(path):
"""Expands paths, starting with ``.`` or ``~``, to an absolute path."""
if path.startswith('.'):
return os.path.abspath(path)
if path.startswith('~'):
return os.path.expanduser(path)
return path
def get_unicode(string):
"""Converts `string` to `unicode`, if necessary."""
if isinstance(string, unicode):
return string
return unicode(string, ENCODING, 'replace')
def lisdir(path):
"""Checks if `path` is a directory. Doesn't follow symbolic links.
Returns bool.
"""
try:
lstat = os.lstat(path)
except OSError:
return False
return stat.S_ISDIR(lstat.st_mode)
def exec_ok(binary):
"""Checks if the `binary` exists and if it is executable.
Throws a `VMMError` if the `binary` isn't a file or is not
executable.
"""
binary = expand_path(binary)
if not os.path.isfile(binary):
raise VMMError(_(u"No such file: '%s'") % get_unicode(binary),
NO_SUCH_BINARY)
if not os.access(binary, os.X_OK):
raise VMMError(_(u"File is not executable: '%s'") %
get_unicode(binary), NOT_EXECUTABLE)
return binary
def human_size(size):
"""Converts the `size` in bytes in human readable format."""
if not isinstance(size, (long, int)):
try:
size = long(size)
except ValueError:
raise TypeError("'size' must be a positive long or int.")
if size < 0:
raise ValueError("'size' must be a positive long or int.")
if size < 1024:
return str(size)
# TP: abbreviations of gibibyte, tebibyte kibibyte and mebibyte
prefix_multiply = ((_(u'TiB'), 1 << 40), (_(u'GiB'), 1 << 30),
(_(u'MiB'), 1 << 20), (_(u'KiB'), 1 << 10))
for prefix, multiply in prefix_multiply:
if size >= multiply:
# TP: e.g.: '%(size)s %(prefix)s' -> '118.30 MiB'
return _(u'%(size)s %(prefix)s') % {
'size': locale.format('%.2f', float(size) / multiply,
True).decode(ENCODING, 'replace'),
'prefix': prefix}
def size_in_bytes(size):
"""Converts the string `size` to a long (size in bytes).
The string `size` can be suffixed with *b* (bytes), *k* (kilobytes),
*M* (megabytes) or *G* (gigabytes).
"""
if not isinstance(size, basestring) or not size:
raise TypeError('size must be a non empty string.')
if size[-1].upper() in ('B', 'K', 'M', 'G'):
try:
num = int(size[:-1])
except ValueError:
raise ValueError('Not a valid integer value: %r' % size[:-1])
unit = size[-1].upper()
if unit == 'B':
return num
elif unit == 'K':
return num << 10L
elif unit == 'M':
return num << 20L
else:
return num << 30L
else:
try:
num = int(size)
except ValueError:
raise ValueError('Not a valid size value: %r' % size)
return num
def validate_transport(transport, maillocation):
"""Checks if the `transport` is usable for the given `maillocation`.
Throws a `VMMError` if the chosen `transport` is unable to write
messages in the `maillocation`'s mailbox format.
Arguments:
`transport` : VirtualMailManager.transport.Transport
a Transport object
`maillocation` : VirtualMailManager.maillocation.MailLocation
a MailLocation object
"""
if transport.transport in ('virtual', 'virtual:') and \
not maillocation.postfix:
raise VMMError(_(u"Invalid transport '%(transport)s' for mailbox "
u"format '%(mbfmt)s'.") %
{'transport': transport.transport,
'mbfmt': maillocation.mbformat}, INVALID_MAIL_LOCATION)
def version_hex(version_string):
"""Converts a Dovecot version, e.g.: '1.2.3' or '2.0.beta4', to an int.
Raises a `ValueError` if the *version_string* has the wrong™ format.
version_hex('1.2.3') -> 270548736
hex(version_hex('1.2.3')) -> '0x10203f00'
"""
global _version_cache
if version_string in _version_cache:
return _version_cache[version_string]
version = 0
version_mo = VERSION_RE.match(version_string)
if not version_mo:
raise ValueError('Invalid version string: %r' % version_string)
major, minor, patch, level, serial = version_mo.groups()
major = int(major)
minor = int(minor)
if patch:
patch = int(patch)
if serial:
serial = int(serial)
if major > 0xFF or minor > 0xFF or \
patch and patch > 0xFF or serial and serial > 0xFF:
raise ValueError('Invalid version string: %r' % version_string)
version += major << 28
version += minor << 20
if patch:
version += patch << 12
version += _version_level.get(level, 0xF) << 8
if serial:
version += serial
_version_cache[version_string] = version
return version
def version_str(version):
"""Converts a Dovecot version previously converted with version_hex back to
a string.
Raises a `TypeError` if *version* is not an int/long.
Raises a `ValueError` if *version* is an incorrect int version.
"""
global _version_cache
if version in _version_cache:
return _version_cache[version]
if not isinstance(version, (int, long)):
raise TypeError('Argument is not a int/long: %r', version)
major = (version >> 28) & 0xFF
minor = (version >> 20) & 0xFF
patch = (version >> 12) & 0xFF
level = (version >> 8) & 0x0F
serial = version & 0xFF
levels = dict(zip(_version_level.values(), _version_level.keys()))
if level == 0xF and not serial:
version_string = '%u.%u.%u' % (major, minor, patch)
elif level in levels and not patch:
version_string = '%u.%u.%s%u' % (major, minor, levels[level], serial)
else:
raise ValueError('Invalid version: %r' % hex(version))
_version_cache[version] = version_string
return version_string
def format_domain_default(domaindata):
"""Format info output when the value displayed is the domain default."""
# TP: [domain default] indicates that a user's setting is the same as
# configured in the user's domain.
# e.g.: [ 0.84%] 42/5,000 [domain default]
return _(u'%s [domain default]') % domaindata
def search_addresses(dbh, typelimit=None, lpattern=None, llike=False,
dpattern=None, dlike=False):
"""'Search' for addresses by *pattern* in the database.
The search is limited by *typelimit*, a bitfield with values TYPE_ACCOUNT,
TYPE_ALIAS, TYPE_RELOCATED, or a bitwise OR thereof. If no limit is
specified, all types will be searched.
*lpattern* may be a local part or a partial local part - starting and/or
ending with a '%' sign. When the *lpattern* starts or ends with a '%' sign
*llike* has to be `True` to perform a wildcard search. To retrieve all
available addresses use the arguments' default values.
*dpattern* and *dlike* behave analogously for the domain part of an
address, allowing for separate pattern matching: testuser%@example.%
The return value of this function is a tuple. The first element is a list
of domain IDs sorted alphabetically by the corresponding domain names. The
second element is a dictionary indexed by domain ID, holding lists to
associated addresses. Each address is itself actually a tuple of address,
type, and boolean indicating whether the address stems from an alias
domain.
"""
if typelimit is None:
typelimit = TYPE_ACCOUNT | TYPE_ALIAS | TYPE_RELOCATED
queries = []
if typelimit & TYPE_ACCOUNT:
queries.append('SELECT gid, local_part, %d AS type FROM users'
% TYPE_ACCOUNT)
if typelimit & TYPE_ALIAS:
queries.append('SELECT DISTINCT gid, address as local_part, '
'%d AS type FROM alias' % TYPE_ALIAS)
if typelimit & TYPE_RELOCATED:
queries.append('SELECT gid, address as local_part, %d AS type '
'FROM relocated' % TYPE_RELOCATED)
sql = "SELECT gid, local_part || '@' || domainname AS address, "
sql += 'type, NOT is_primary AS from_aliasdomain FROM ('
sql += ' UNION '.join(queries)
sql += ') a JOIN domain_name USING (gid)'
nextkw = 'WHERE'
sqlargs = []
for like, field, pattern in ((dlike, 'domainname', dpattern),
(llike, 'local_part', lpattern)):
if like:
match = 'LIKE'
else:
if not pattern:
continue
match = '='
sql += ' %s %s %s %%s' % (nextkw, field, match)
sqlargs.append(pattern)
nextkw = 'AND'
sql += ' ORDER BY domainname, local_part'
dbc = dbh.cursor()
dbc.execute(sql, sqlargs)
result = dbc.fetchall()
dbc.close()
gids = []
daddrs = {}
for gid, address, addrtype, aliasdomain in result:
if gid not in daddrs:
gids.append(gid)
daddrs[gid] = []
daddrs[gid].append((address, addrtype, aliasdomain))
return gids, daddrs
del _
|