/usr/lib/python2.7/dist-packages/Milter/test.py is in python-milter 1.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | ## @package Milter.test
# A test framework for milters
import rfc822
import StringIO
import Milter
Milter.NOREPLY = Milter.CONTINUE
## Test mixin for unit testing %milter applications.
# This mixin overrides many Milter.MilterBase methods
# with stub versions that simply record what was done.
# @since 0.9.8
class TestBase(object):
def __init__(self,logfile='test/milter.log'):
self._protocol = 0
self.logfp = open(logfile,"a")
## List of recipients deleted
self._delrcpt = []
## List of recipients added
self._addrcpt = []
## Macros defined
self._macros = { }
## The message body.
self._body = None
## True if the %milter replaced the message body.
self._bodyreplaced = False
## True if the %milter changed any headers.
self._headerschanged = False
## Reply codes and messages set by the %milter
self._reply = None
## The rfc822 message object for the current email being fed to the %milter.
self._msg = None
self._symlist = [ None, None, None, None, None, None, None ]
def log(self,*msg):
for i in msg: print >>self.logfp, i,
print >>self.logfp
## Set a macro value.
# These are retrieved by the %milter with getsymval.
# @param name the macro name, as passed to getsymval
# @param val the macro value
def setsymval(self,name,val):
self._macros[name] = val
def getsymval(self,name):
# FIXME: track stage, and use _symlist
return self._macros.get(name,'')
def replacebody(self,chunk):
if self._body:
self._body.write(chunk)
self._bodyreplaced = True
else:
raise IOError,"replacebody not called from eom()"
# FIXME: rfc822 indexing does not really reflect the way chg/add header
# work for a %milter
def chgheader(self,field,idx,value):
if not self._body:
raise IOError,"chgheader not called from eom()"
self.log('chgheader: %s[%d]=%s' % (field,idx,value))
if value == '':
del self._msg[field]
else:
self._msg[field] = value
self._headerschanged = True
def addheader(self,field,value,idx=-1):
if not self._body:
raise IOError,"addheader not called from eom()"
self.log('addheader: %s=%s' % (field,value))
self._msg[field] = value
self._headerschanged = True
def delrcpt(self,rcpt):
if not self._body:
raise IOError,"delrcpt not called from eom()"
self._delrcpt.append(rcpt)
def addrcpt(self,rcpt):
if not self._body:
raise IOError,"addrcpt not called from eom()"
self._addrcpt.append(rcpt)
## Save the reply codes and messages in self._reply.
def setreply(self,rcode,xcode,*msg):
self._reply = (rcode,xcode) + msg
def setsymlist(self,stage,macros):
if not self._actions & SETSYMLIST: raise DisabledAction("SETSYMLIST")
# not used yet, but just for grins we save the data
a = []
for m in macros:
try:
m = m.encode('utf8')
except: pass
try:
m = m.split(' ')
except: pass
a += m
self._symlist[stage] = set(a)
## Feed a file like object to the %milter. Calls envfrom, envrcpt for
# each recipient, header for each header field, body for each body
# block, and finally eom. A return code from the %milter other than
# CONTINUE returns immediately with that return code.
#
# This is a convenience method, a test could invoke the callbacks
# in sequence on its own - and for some complex tests, this may
# be necessary.
# @param fp the file with rfc2822 message stream
# @param sender the MAIL FROM
# @param rcpt RCPT TO - additional recipients may follow
def feedFile(self,fp,sender="spam@adv.com",rcpt="victim@lamb.com",*rcpts):
self._body = None
self._bodyreplaced = False
self._headerschanged = False
self._reply = None
msg = rfc822.Message(fp)
rc = self.envfrom('<%s>'%sender)
if rc != Milter.CONTINUE: return rc
for rcpt in (rcpt,) + rcpts:
rc = self.envrcpt('<%s>'%rcpt)
if rc != Milter.CONTINUE: return rc
line = None
for h in msg.headers:
if h[:1].isspace():
line = line + h
continue
if not line:
line = h
continue
s = line.split(': ',1)
if len(s) > 1: val = s[1].strip()
else: val = ''
rc = self.header(s[0],val)
if rc != Milter.CONTINUE: return rc
line = h
if line:
s = line.split(': ',1)
rc = self.header(s[0],s[1])
if rc != Milter.CONTINUE: return rc
rc = self.eoh()
if rc != Milter.CONTINUE: return rc
while 1:
buf = fp.read(8192)
if len(buf) == 0: break
rc = self.body(buf)
if rc != Milter.CONTINUE: return rc
self._msg = msg
self._body = StringIO.StringIO()
rc = self.eom()
if self._bodyreplaced:
body = self._body.getvalue()
else:
msg.rewindbody()
body = msg.fp.read()
self._body = StringIO.StringIO()
self._body.writelines(msg.headers)
self._body.write('\n')
self._body.write(body)
return rc
## Feed an email contained in a file to the %milter.
# This is a convenience method that invokes @link #feedFile feedFile @endlink.
# @param sender MAIL FROM
# @param rcpts RCPT TO, multiple recipients may be supplied
def feedMsg(self,fname,sender="spam@adv.com",*rcpts):
with open('test/'+fname,'r') as fp:
return self.feedFile(fp,sender,*rcpts)
## Call the connect and helo callbacks.
# The helo callback is not called if connect does not return CONTINUE.
# @param host the hostname passed to the connect callback
# @param helo the hostname passed to the helo callback
# @param ip the IP address passed to the connect callback
def connect(self,host='localhost',helo='spamrelay',ip='1.2.3.4'):
self._body = None
self._bodyreplaced = False
opts = [ Milter.CURR_ACTS,~0,0,0 ]
rc = self.negotiate(opts)
rc = super(TestBase,self).connect(host,1,(ip,1234))
if rc != Milter.CONTINUE:
self.close()
return rc
rc = self.hello(helo)
if rc != Milter.CONTINUE:
self.close()
return rc
|