This file is indexed.

/usr/bin/trytond_import_zip is in tryton-modules-country 4.6.0-1.

This file is owned by root:root, with mode 0o755.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
#!/usr/bin/python3
# This file is part of Tryton.  The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import csv
import os
import sys
import urllib2
import zipfile

from argparse import ArgumentParser
from io import BytesIO

try:
    from progressbar import ProgressBar, Bar, ETA, SimpleProgress
except ImportError:
    ProgressBar = None

try:
    from proteus import Model, config
except ImportError:
    prog = os.path.basename(sys.argv[0])
    sys.exit("proteus must be installed to use %s" % prog)


def clean(code):
    sys.stderr.write('Cleaning')
    Zip = Model.get('country.zip')
    Zip._proxy.delete(
        [z.id for z in Zip.find([('country.code', '=', code)])], {})
    print >> sys.stderr, '.'


def fetch(code):
    sys.stderr.write('Fetching')
    url = 'http://download.geonames.org/export/zip/%s.zip' % code
    responce = urllib2.urlopen(url)
    data = responce.read()
    with zipfile.ZipFile(BytesIO(data)) as zf:
        data = zf.read('%s.txt' % code)
    print >> sys.stderr, '.'
    return data


def import_(data):
    Zip = Model.get('country.zip')
    Country = Model.get('country.country')
    Subdivision = Model.get('country.subdivision')
    print >> sys.stderr, 'Importing'

    def get_country(code):
        country = countries.get(code)
        if not country:
            country, = Country.find([('code', '=', code)])
            countries[code] = country
        return country
    countries = {}

    def get_subdivision(country, code):
        code = '%s-%s' % (country, code)
        subdivision = subdivisions.get(code)
        if not subdivision:
            try:
                subdivision, = Subdivision.find([('code', '=', code)])
            except ValueError:
                return
            subdivisions[code] = subdivision
        return subdivision
    subdivisions = {}

    if ProgressBar:
        pbar = ProgressBar(
            widgets=[SimpleProgress(), Bar(), ETA()])
    else:
        pbar = iter
    for row in pbar(list(csv.DictReader(BytesIO(data),
                    fieldnames=_fieldnames, delimiter='\t'))):
        country = get_country(row['country'])
        subdivision = get_subdivision(row['country'], row['code1'])
        Zip(country=country, subdivision=subdivision, zip=row['postal'],
            city=row['place']).save()

_fieldnames = ['country', 'postal', 'place', 'name1', 'code1',
    'name2', 'code2', 'name3', 'code3', 'latitude', 'longitude', 'accuracy']


def main(database, codes, config_file=None):
    config.set_trytond(database, config_file=config_file)

    for code in codes:
        print >> sys.stderr, code
        code = code.upper()
        clean(code)
        import_(fetch(code))

if __name__ == '__main__':
    parser = ArgumentParser()
    parser.add_argument('-d', '--database', dest='database')
    parser.add_argument('-c', '--config', dest='config_file',
        help='the trytond config file')
    parser.add_argument('codes', nargs='+')

    args = parser.parse_args()
    if not args.database:
        parser.error('Missing database')
    main(args.database, args.codes, args.config_file)