/usr/lib/python2.7/dist-packages/spambayes/compatcsv.py is in spambayes 1.1b1-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | #!/usr/bin/env python
"""Implement just enough of a csv parser to support sb_dbexpimp.py's needs."""
import sys
import re
if sys.platform == "windows":
EOL = "\r\n"
elif sys.platform == "mac":
EOL = "\r"
else:
EOL = "\n"
class reader:
def __init__(self, fp):
self.fp = fp
def __iter__(self):
return self
def _readline(self):
line = self.fp.readline()
# strip any EOL detritus
while line[-1:] in ("\r", "\n"):
line = line[:-1]
return line
def next(self):
line = self._readline()
if not line:
raise StopIteration
return self.parse_line(line)
def parse_line(self, line):
"""parse the line.
very simple assumptions:
* separator is a comma
* fields are only quoted with quotation marks and only
quoted if the field contains a comma or a quotation mark
* embedded quotation marks are doubled
"""
result = []
while line:
# quoted field
if line[0] == '"':
line = line[1:]
field = []
while True:
if line[0:2] == '""':
field.append('"')
line = line[2:]
elif line[0] == '"':
# end of field - skip quote and possible comma
line = line[1:]
if line[0:1] == ',':
line = line[1:]
break
else:
field.append(line[0])
line = line[1:]
# ran out of line before field
if not line:
field.append("\n")
line = self._readline()
if not line:
raise IOError, "end-of-file during parsing"
else:
# unquoted field
field = []
while line:
if line[0] == ',':
# end of field
line = line[1:]
break
else:
field.append(line[0])
line = line[1:]
result.append("".join(field))
return result
class writer:
def __init__(self, fp):
self.fp = fp
def writerow(self, row):
result = []
for item in row:
if isinstance(item, unicode):
item = item.encode("utf-8")
else:
item = str(item)
if re.search('["\n,]', item) is not None:
item = '"%s"' % item.replace('"', '""')
result.append(item)
result = ",".join(result)
self.fp.write(result+EOL)
if __name__ == "__main__":
import unittest
import StringIO
class TestCase(unittest.TestCase):
def test_reader(self):
f = StringIO.StringIO('''\
"""rare""",1,0
"beginning;
end=""itinhh.txt""",1,0
''')
f.seek(0)
rdr = reader(f)
self.assertEqual(rdr.next(), ['"rare"', '1', '0'])
self.assertEqual(rdr.next(),
['beginning;\n\tend="itinhh.txt"','1', '0'])
self.assertRaises(StopIteration, rdr.next)
unittest.main()
|