/usr/share/pyshared/lamson/queue.py is in python-lamson 1.0pre11-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 | """
Simpler queue management than the regular mailbox.Maildir stuff. You
do get a lot more features from the Python library, so if you need
to do some serious surgery go use that. This works as a good
API for the 90% case of "put mail in, get mail out" queues.
"""
import mailbox
from lamson import mail
import hashlib
import socket
import time
import os
import errno
import logging
# we calculate this once, since the hostname shouldn't change for every
# email we put in a queue
HASHED_HOSTNAME = hashlib.md5(socket.gethostname()).hexdigest()
class SafeMaildir(mailbox.Maildir):
def _create_tmp(self):
now = time.time()
uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
mailbox.Maildir._count, HASHED_HOSTNAME)
path = os.path.join(self._path, 'tmp', uniq)
try:
os.stat(path)
except OSError, e:
if e.errno == errno.ENOENT:
mailbox.Maildir._count += 1
try:
return mailbox._create_carefully(path)
except OSError, e:
if e.errno != errno.EEXIST:
raise
else:
raise
# Fall through to here if stat succeeded or open raised EEXIST.
raise mailbox.ExternalClashError('Name clash prevented file creation: %s' % path)
class QueueError(Exception):
def __init__(self, msg, data):
Exception.__init__(self, msg)
self._message = msg
self.data = data
class Queue(object):
"""
Provides a simplified API for dealing with 'queues' in Lamson.
It currently just supports maildir queues since those are the
most robust, but could implement others later.
"""
def __init__(self, queue_dir, safe=False, pop_limit=0, oversize_dir=None):
"""
This gives the Maildir queue directory to use, and whether you want
this Queue to use the SafeMaildir variant which hashes the hostname
so you can expose it publicly.
The pop_limit and oversize_queue both set a upper limit on the mail
you pop out of the queue. The size is checked before any Lamson
processing is done and is based on the size of the file on disk. The
purpose is to prevent people from sending 10MB attachments. If a
message is over the pop_limit then it is placed into the
oversize_dir (which should be a maildir).
The oversize protection only works on pop messages off, not
putting them in, get, or any other call. If you use get you can
use self.oversize to also check if it's oversize manually.
"""
self.dir = queue_dir
if safe:
self.mbox = SafeMaildir(queue_dir)
else:
self.mbox = mailbox.Maildir(queue_dir)
self.pop_limit = pop_limit
if oversize_dir:
if not os.path.exists(oversize_dir):
osmb = mailbox.Maildir(oversize_dir)
self.oversize_dir = os.path.join(oversize_dir, "new")
if not os.path.exists(self.oversize_dir):
os.mkdir(self.oversize_dir)
else:
self.oversize_dir = None
def push(self, message):
"""
Pushes the message onto the queue. Remember the order is probably
not maintained. It returns the key that gets created.
"""
return self.mbox.add(str(message))
def pop(self):
"""
Pops a message off the queue, order is not really maintained
like a stack.
It returns a (key, message) tuple for that item.
"""
for key in self.mbox.iterkeys():
over, over_name = self.oversize(key)
if over:
if self.oversize_dir:
logging.info("Message key %s over size limit %d, moving to %s.",
key, self.pop_limit, self.oversize_dir)
os.rename(over_name, os.path.join(self.oversize_dir, key))
else:
logging.info("Message key %s over size limit %d, DELETING (set oversize_dir).",
key, self.pop_limit)
os.unlink(over_name)
else:
try:
msg = self.get(key)
except QueueError, exc:
raise exc
finally:
self.remove(key)
return key, msg
return None, None
def get(self, key):
"""
Get the specific message referenced by the key. The message is NOT
removed from the queue.
"""
msg_file = self.mbox.get_file(key)
if not msg_file:
return None
msg_data = msg_file.read()
try:
return mail.MailRequest(self.dir, None, None, msg_data)
except Exception, exc:
raise QueueError("Failed to decode message: %s" % exc, msg_data)
def remove(self, key):
"""Removes the queue, but not returned."""
self.mbox.remove(key)
def count(self):
"""Returns the number of messages in the queue."""
return len(self.mbox)
def clear(self):
"""
Clears out the contents of the entire queue.
Warning: This could be horribly inefficient since it
basically pops until the queue is empty.
"""
# man this is probably a really bad idea
while self.count() > 0:
self.pop()
def keys(self):
"""
Returns the keys in the queue.
"""
return self.mbox.keys()
def oversize(self, key):
if self.pop_limit:
file_name = os.path.join(self.dir, "new", key)
return os.path.getsize(file_name) > self.pop_limit, file_name
else:
return False, None
|