This file is indexed.

/usr/lib/python2.7/dist-packages/Milter/test.py is in python-milter 1.0-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
## @package Milter.test
# A test framework for milters

import rfc822
import StringIO
import Milter

Milter.NOREPLY = Milter.CONTINUE

## Test mixin for unit testing %milter applications.
# This mixin overrides many Milter.MilterBase methods
# with stub versions that simply record what was done.
# @since 0.9.8
class TestBase(object):

  def __init__(self,logfile='test/milter.log'):
    self._protocol = 0
    self.logfp = open(logfile,"a")
    ## List of recipients deleted
    self._delrcpt = []
    ## List of recipients added
    self._addrcpt = []
    ## Macros defined
    self._macros = { }
    ## The message body.
    self._body = None
    ## True if the %milter replaced the message body.
    self._bodyreplaced = False
    ## True if the %milter changed any headers.
    self._headerschanged = False
    ## Reply codes and messages set by the %milter
    self._reply = None
    ## The rfc822 message object for the current email being fed to the %milter.
    self._msg = None
    self._symlist = [ None, None, None, None, None, None, None ]

  def log(self,*msg):
    for i in msg: print >>self.logfp, i,
    print >>self.logfp

  ## Set a macro value.
  # These are retrieved by the %milter with getsymval.
  # @param name the macro name, as passed to getsymval
  # @param val the macro value
  def setsymval(self,name,val):
    self._macros[name] = val
    
  def getsymval(self,name):
    # FIXME: track stage, and use _symlist
    return self._macros.get(name,'')

  def replacebody(self,chunk):
    if self._body:
      self._body.write(chunk)
      self._bodyreplaced = True
    else:
      raise IOError,"replacebody not called from eom()"

  # FIXME: rfc822 indexing does not really reflect the way chg/add header
  # work for a %milter
  def chgheader(self,field,idx,value):
    if not self._body:
      raise IOError,"chgheader not called from eom()"
    self.log('chgheader: %s[%d]=%s' % (field,idx,value))
    if value == '':
      del self._msg[field]
    else:
      self._msg[field] = value
    self._headerschanged = True

  def addheader(self,field,value,idx=-1):
    if not self._body:
      raise IOError,"addheader not called from eom()"
    self.log('addheader: %s=%s' % (field,value))
    self._msg[field] = value
    self._headerschanged = True

  def delrcpt(self,rcpt):
    if not self._body:
      raise IOError,"delrcpt not called from eom()"
    self._delrcpt.append(rcpt)

  def addrcpt(self,rcpt):
    if not self._body:
      raise IOError,"addrcpt not called from eom()"
    self._addrcpt.append(rcpt)

  ## Save the reply codes and messages in self._reply.
  def setreply(self,rcode,xcode,*msg):
    self._reply = (rcode,xcode) + msg

  def setsymlist(self,stage,macros):
    if not self._actions & SETSYMLIST: raise DisabledAction("SETSYMLIST")
    # not used yet, but just for grins we save the data
    a = []
    for m in macros:
      try:
        m = m.encode('utf8')
      except: pass
      try:
        m = m.split(' ')
      except: pass
      a += m
    self._symlist[stage] = set(a)

  ## Feed a file like object to the %milter.  Calls envfrom, envrcpt for
  # each recipient, header for each header field, body for each body 
  # block, and finally eom.  A return code from the %milter other than
  # CONTINUE returns immediately with that return code.
  #
  # This is a convenience method, a test could invoke the callbacks
  # in sequence on its own - and for some complex tests, this may
  # be necessary.
  # @param fp the file with rfc2822 message stream
  # @param sender the MAIL FROM
  # @param rcpt RCPT TO - additional recipients may follow
  def feedFile(self,fp,sender="spam@adv.com",rcpt="victim@lamb.com",*rcpts):
    self._body = None
    self._bodyreplaced = False
    self._headerschanged = False
    self._reply = None
    msg = rfc822.Message(fp)
    rc = self.envfrom('<%s>'%sender)
    if rc != Milter.CONTINUE: return rc
    for rcpt in (rcpt,) + rcpts:
      rc = self.envrcpt('<%s>'%rcpt)
      if rc != Milter.CONTINUE: return rc
    line = None
    for h in msg.headers:
      if h[:1].isspace():
        line = line + h
        continue
      if not line:
        line = h
        continue
      s = line.split(': ',1)
      if len(s) > 1: val = s[1].strip()
      else: val = ''
      rc = self.header(s[0],val)
      if rc != Milter.CONTINUE: return rc
      line = h
    if line:
      s = line.split(': ',1)
      rc = self.header(s[0],s[1])
      if rc != Milter.CONTINUE: return rc
    rc = self.eoh()
    if rc != Milter.CONTINUE: return rc
    while 1:
      buf = fp.read(8192)
      if len(buf) == 0: break
      rc = self.body(buf)
      if rc != Milter.CONTINUE: return rc
    self._msg = msg
    self._body = StringIO.StringIO()
    rc = self.eom()
    if self._bodyreplaced:
      body = self._body.getvalue()
    else:
      msg.rewindbody()
      body = msg.fp.read()
    self._body = StringIO.StringIO()
    self._body.writelines(msg.headers)
    self._body.write('\n')
    self._body.write(body)
    return rc

  ## Feed an email contained in a file to the %milter.
  # This is a convenience method that invokes @link #feedFile feedFile @endlink.
  # @param sender MAIL FROM
  # @param rcpts RCPT TO, multiple recipients may be supplied
  def feedMsg(self,fname,sender="spam@adv.com",*rcpts):
    with open('test/'+fname,'r') as fp:
      return self.feedFile(fp,sender,*rcpts)

  ## Call the connect and helo callbacks.
  # The helo callback is not called if connect does not return CONTINUE.
  # @param host the hostname passed to the connect callback
  # @param helo the hostname passed to the helo callback
  # @param ip the IP address passed to the connect callback
  def connect(self,host='localhost',helo='spamrelay',ip='1.2.3.4'):
    self._body = None
    self._bodyreplaced = False
    opts = [ Milter.CURR_ACTS,~0,0,0 ]
    rc = self.negotiate(opts)
    rc =  super(TestBase,self).connect(host,1,(ip,1234)) 
    if rc != Milter.CONTINUE:
      self.close()
      return rc
    rc = self.hello(helo)
    if rc != Milter.CONTINUE:
      self.close()
    return rc