This file is indexed.

/usr/share/pyshared/asrun/rex.py is in code-aster-run 1.13.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
# -*- coding: utf-8 -*-

# ==============================================================================
# COPYRIGHT (C) 1991 - 2003  EDF R&D                  WWW.CODE-ASTER.ORG
# THIS PROGRAM IS FREE SOFTWARE; YOU CAN REDISTRIBUTE IT AND/OR MODIFY
# IT UNDER THE TERMS OF THE GNU GENERAL PUBLIC LICENSE AS PUBLISHED BY
# THE FREE SOFTWARE FOUNDATION; EITHER VERSION 2 OF THE LICENSE, OR
# (AT YOUR OPTION) ANY LATER VERSION.
#
# THIS PROGRAM IS DISTRIBUTED IN THE HOPE THAT IT WILL BE USEFUL, BUT
# WITHOUT ANY WARRANTY; WITHOUT EVEN THE IMPLIED WARRANTY OF
# MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. SEE THE GNU
# GENERAL PUBLIC LICENSE FOR MORE DETAILS.
#
# YOU SHOULD HAVE RECEIVED A COPY OF THE GNU GENERAL PUBLIC LICENSE
# ALONG WITH THIS PROGRAM; IF NOT, WRITE TO EDF R&D CODE_ASTER,
#    1 AVENUE DU GENERAL DE GAULLE, 92141 CLAMART CEDEX, FRANCE.
# ==============================================================================

"""
Give methods to :
    - create a new issue in the tracker database,
    - close an issue,
    - extract messages from the tracker to write a histor file...
Methods are called by an AsterRun object.
"""

import os
import os.path as osp
import re

from asrun.installation import aster_root, confdir
from asrun.common.i18n import _
from asrun.mystring     import print3, ufmt, convert, cut_long_lines
from asrun.maintenance  import GetVersion
from asrun.common_func  import get_tmpname
from asrun.common.sysutils import on_linux
from asrun.common.utils import now


try:
    import asrun.myconnect as db
    from asrun.schema    import ReadDB, TYPE, USER, PRODUIT, STATUS, ISSUE, MSG, \
                               ISSUE_MESSAGES, YES, NO, ReadDBError, WriteDBError, \
                               mysql_date_fmt, ISSUE_NOSY
    from asrun.histor    import InitHistor
    imports_succeed = True
except ImportError:
    imports_succeed = False

db_encoding  = 'utf-8'


def in_adm_group(run):
    """Tell if the current user has admin rights.
    """
    if on_linux():
        import grp
        is_adm = grp.getgrgid(os.getgid())[0] in (run.get('agla_adm_group'), )
    else:
        is_adm = False
    return is_adm


def parse_issue_file(content):
    """Parse the content of the issue."""
    l_champs = ['NOMUSER', 'MAILUSER', 'UNIUSER',
                'TITRE', 'DATE1', 'DATE2', 'VERSION', 'TYPFIC', 'TEXTE',
                'FICASS']
    dinf = {}
    for ch in l_champs:
        exp = re.compile('\@%s\@(.*)\@FIN%s\@' % (ch, ch), re.MULTILINE | re.DOTALL)
        mat = exp.search(content)
        if mat != None and mat.group(1).strip() != 'Non_défini':
            dinf[ch] = mat.group(1).strip()
    return dinf


def SetParser(run):
    """Configure the command-line parser, add options name to store to the list,
    set actions informations.
    run : AsterRun object which manages the execution
    """
    acts_descr = {
        'create_issue' : {
            'method' : Creer,
            'syntax' : 'issue_file export_file',
            'help'   : _(u'Insert a new entry in the issue tracking system and '
                          'copy attached files if an export file is provided')
        },
        'extract_histor' : {
            'method' : Histor,
            'syntax' : '[--status=STAT] [--format=FORM] [--all_msg] input_file histor',
            'help'   : _(u'Extract the content of issues listed in `input_file` to `histor`')
        },
        'close_issue' : {
            'method' : Solder,
            'syntax' : '--vers=VERS histor',
            'help'   : _(u'Fill "corrVdev" or "corrVexpl" field (depends on VERS) in issues found in `histor` and eventually close them')
        },
    }
    opts_descr = {
        'status' : {
            'args'   : ('--status', ),
            'kwargs' : {
                'action'  : 'store',
                'default' : 'all',
                'metavar' : 'STAT',
                'choices' : ('all', 'resolu', 'valide_EDA', 'attente_doc', 'ferme'),
                'dest'    : 'status',
                'help'    : _(u'raise an error if issues are not in this status')
            }
        },
        'format' : {
            'args'   : ('--format', ),
            'kwargs' : {
                'action'  : 'store',
                'default' : 'text',
                'metavar' : 'FORM',
                'choices' : ('text', 'html', 'fsq'),
                'dest'    : 'format',
                'help'    : _(u'format of generated histor file (text or html)')
            }
        },
        'all_msg' : {
            'args'   : ('--all_msg', ),
            'kwargs' : {
                'action'  : 'store_true',
                'default' : False,
                'dest'    : 'all_msg',
                'help'    : _(u'retrieve all the messages of issues')
            }
        },
    }
    title = _(u'Options for issue tracker interface')
    run.SetActions(
            actions_descr=acts_descr,
            actions_order=['create_issue', 'close_issue', 'extract_histor'],
            group_options=True, group_title=title, actions_group_title=False,
            options_descr=opts_descr,
    )


def Creer(run, *args):
    """Create a new issue in REX database.
    """
    # backward compatibility: 2nd argument was optional in astk <= 1.8.3
    if len(args) < 1 or len(args) > 2:
        run.parser.error(_(u"'--%s' requires one or two arguments") % run.current_action)

    iret = 0
    on_machref = run.get('rep_agla', 'local') != 'local'
    if not on_machref:
        run.Mess(_(u'Only available on the AGLA machine'), '<F>_AGLA_ERROR')

    # 0. check imports
    if not imports_succeed:
        run.Mess(_(u'Imports of REX interface failed.'), '<F>_IMPORT_ERROR')

    # 1. copy issue file
    jn = run['num_job']
    ffich = get_tmpname(run, run['tmp_user'], basename='rex_fich')
    kret = run.Copy(ffich, args[0], niverr='<F>_COPYFILE')

    # 1b. parse issue file
    content = open(ffich, 'r').read()
    d = parse_issue_file(content)
    if run['debug']:
        print3('Dict issue content : ', repr(d))

    fichetude = int(d.get('FICASS') is not None)
    if fichetude == 1:
        assert len(args) > 1, "inconsistent data" # check backward compatibility
        fprof = get_tmpname(run, run['tmp_user'], basename='rex_prof')
        kret = run.Copy(fprof, args[1], niverr='<F>_PROFILE_COPY')

    # 3. open database connection
    try:
        c = db.CONNECT('REX', rcdir=confdir, verbose=run['debug'])
    except Exception, msg:
        run.Mess(msg, '<F>_DB_ERROR')

    typ = ReadDB(TYPE, c)
    d_typ = {
        'AL'   : typ['anomalie'],
        'EL'   : typ['evolution'],
        'AOM'  : typ['aide utilisation'],
        'AO'   : typ['anomalie'],
        'EO'   : typ['evolution'],
        'ED'   : typ['evolution'],
    }
    if not d['TYPFIC'] in d_typ.keys():
        run.Mess(_(u'Unknown issue type : %s') % d['TYPFIC'], '<F>_PARSE_ERROR')

    # 4. create new issue
    # 4.1. get fields from db
    login = run.system.getuser_host()[0]
    loginrex = login
    try:
        res = c.exe("""SELECT id, _username FROM _user WHERE _loginaster='%s' AND __retired__=0;""" % login)
        loginrex = res[0][1]
        if len(res) > 1:
            run.Mess(_(u"More than one user has '%s' as login in REX database, " \
                    "'%s' is taken") % (login, loginrex),
                '<A>_MULTIPLE_USER')
    except (IndexError, db.MySQLError), msg:
        run.Mess(str(msg))
        run.Mess(_(u'User %s unknown in REX database') % login,
                '<F>_UNKNOWN_USER')
    user = USER({'_username' : loginrex}, c)
    try:
        user.read()
    except ReadDBError:
        run.Mess(_(u'User %s unknown in REX database') % login,
                '<F>_UNKNOWN_USER')
    prod = PRODUIT({'_name' : 'Code_Aster'}, c)
    try:
        prod.read()
    except ReadDBError:
        run.Mess(_(u'Code_Aster product not found in database !'), '<F>_DB_ERROR')
    emis = STATUS({'_name' : 'emis'}, c)
    try:
        emis.read()
    except ReadDBError:
        run.Mess(_(u"Status 'emis' not found in database !"), '<F>_DB_ERROR')

    # 4.2. get version item
    d_vers = prod.GetLinks()
    vers = d_vers.get(d['VERSION'])
    if vers == None:
        run.Mess(_(u"Version %s not found in database !") % d['VERSION'])

    # 4.3. fill fields
    date_now = now(datefmt=mysql_date_fmt, timefmt="")
    txtmsg = convert(d['TEXTE'], db_encoding)
    txtmsg = cut_long_lines(txtmsg, maxlen=100)
    issue = ISSUE({
            '_creator'  : user,
            '_produit'  : prod,
            '_status'   : emis,
            '_title'    : convert(d['TITRE'], db_encoding),
            '_type'     : d_typ[d['TYPFIC']],
            '_version'  : vers,
            '_fichetude' : fichetude,
        }, c)
    descr = MSG({'_author'   : user,
                '_creation' : date_now,
                '_creator'  : user,
                '_date'     : date_now,
                '_summary'  : txtmsg[:255], } ,c)
    lien = ISSUE_MESSAGES({'linkid' : descr, 'nodeid' : issue}, c)

    # 4.4. insert issue in database
    try:
        lien.write()
    except WriteDBError, msg:
        run.Mess(_(u'Insert issue failed'), '<F>_DB_ERROR')

    # 4.5. add user in nosy list
    nosy = ISSUE_NOSY({'linkid' : user['id'], 'nodeid' : issue['id']}, c)
    try:
        nosy.write()
    except WriteDBError, msg:
        run.Mess(_(u'Add user to nosy list failed'), '<F>_DB_ERROR')

    # 5. get message and issue id
    numid = issue['id']
    msgid = descr['id']
    print3('INDEX=%s MESSAGE=%s' % (numid, msgid))

    # 6. copy message file
    repid = str(int(msgid) / 1000)
    fmsg = osp.join(run['tmp_user'], 'msg%s' % msgid)
    open(fmsg, 'w').write(txtmsg + '\n')
    cmd = []
    cmd.append(osp.join(run['rep_agla'], 'roundup_cp_uid'))
    cmd.append('put')
    cmd.append(fmsg)
    cmd.append('%s' % repid)
    iret, output = run.Shell(' '.join(cmd))
    if iret != 0:
        run.Mess(_(u'Error message: %s') % output)
        run.Mess(_(u'Error occurs during copying message file'), '<F>_COPY')

    # 7. study files
    if fichetude == 1:
        cmd = []
        cmd.append(osp.join(aster_root, 'ASTK', 'ASTK_SERV', 'bin', 'as_rex_prof'))
        cmd.append(fprof)
        cmd.append('%06d' % numid)
        iret, output = run.Shell(' '.join(cmd))
        if iret != 0:
            run.Mess(_(u'Error occurs during copying study files'), '<F>_COPY')


def Histor(run, *args):
    """Extract the content of some issues from REX database.
    """
    if len(args) != 2:
        run.parser.error(_(u"'--%s' requires two arguments") % run.current_action)

    iret = 0
    on_machref = run.get('rep_agla', 'local') != 'local'
    if not on_machref:
        run.Mess(_(u'Only available on the AGLA machine'), '<F>_AGLA_ERROR')

    # 0. check imports
    if not imports_succeed:
        run.Mess(_(u'Imports of REX interface failed.'), '<F>_IMPORT_ERROR')

    # 1. copy input file
    jn = run['num_job']
    ffich = get_tmpname(run, run['tmp_user'], basename='hist_input')
    kret = run.Copy(ffich, args[0], niverr='<F>_COPYFILE')

    # 2. read input file
    hist_content = open(ffich, 'r').read()
    expr = re.compile('([0-9]+)', re.MULTILINE)
    l_nf = [int(n) for n in expr.findall(hist_content)]

    # 3. open database connection
    try:
        c = db.CONNECT('REX', rcdir=confdir, verbose=run['debug'])
    except Exception, msg:
        run.Mess(msg, '<F>_DB_ERROR')

    histor = build_histor(run, l_nf, c)

    # 5. copy histor file
    ffich = get_tmpname(run, run['tmp_user'], basename='hist_output')
    open(ffich, 'w').write(repr(histor))
    kret = run.Copy(args[1], ffich, niverr='<F>_COPYFILE')
    run.Mess(_(u"Histor successfully generated."), 'OK')


def Solder(run, *args):
    """Fill corrVdev/corrVexpl fields and close issues found in a histor file.
    """
    if not run.get('aster_vers'):
        run.parser.error(_(u"You must define 'default_vers' in 'aster' configuration file or use '--vers' option."))
    if len(args) != 1:
        run.parser.error(_(u"'--%s' requires one argument") % run.current_action)

    on_machref = run.get('rep_agla', 'local') != 'local'
    if not on_machref:
        run.Mess(_(u'Only available on the AGLA machine'), '<F>_AGLA_ERROR')
    try:
        is_adm = in_adm_group(run)
        if not is_adm:
            raise KeyError
    except KeyError, msg:
        run.Mess(_(u'insufficient privileges to close issues !'), '<F>_AGLA_ERROR')

    # 0. check imports
    if not imports_succeed:
        run.Mess(_(u'Imports of REX interface failed.'), '<F>_IMPORT_ERROR')

    # 1. read histor file
    jn = run['num_job']
    ffich = get_tmpname(run, run['tmp_user'], basename='hist_input')
    kret = run.Copy(ffich, args[0], niverr='<F>_COPYFILE')

    hist_content = open(ffich, 'r').read()
    expr = re.compile('RESTITUTION FICHE +([0-9]+)', re.MULTILINE)
    l_nf = [int(n) for n in expr.findall(hist_content)]
    if len(l_nf) == 0:
        run.Mess(_(u'Incorrect file, no issue to close.'), '<F>_AGLA_ERROR')

    # 2. get version number and database fields to fill
    expl = False
    # also accept a version number (ex. '2011.1') instead of 'NEW10'
    if run['aster_vers'].replace('.', '').isdigit():
        tagv = run['aster_vers']
        run.Mess(ufmt(_(u'Close issues with version tag : %s'), tagv))
    else:
        run.Mess(_(u'Close issues in :'))
        iret, l_res = GetVersion(run, silent=False, vers=run['aster_vers'])
        tagv = '.'.join(l_res[:3])
        expl = l_res[4]

    # 3. open database connection
    try:
        c = db.CONNECT('REX', rcdir=confdir, verbose=run['debug'])
    except Exception, msg:
        run.Mess(msg, '<F>_DB_ERROR')

    etat = _read_table(run, c, STATUS)
    # 4. read issues from database
    mark_as_closed(run, l_nf, c, tagv, expl)

    # 5. delete files of *all* closed issues
    # list of closed issues
    query = """SELECT id from _issue WHERE _status=%s;""" % etat['ferme']['id']
    try:
        res = c.exe(query)
    except db.MySQLError, msg:
        run.Mess(ufmt(_("error executing a query to the database\n %s"), query), '<F>_DB_ERROR')
    l_ferm = set([i[0] for i in res])
    # list of existing directories
    l_dirs = set([int(d) for d in os.listdir(osp.join(run['rep_rex'], "emise")) \
                if osp.isdir(osp.join(run['rep_rex'], "emise", d)) and d.isdigit()])
    to_del = l_dirs.intersection(l_ferm)
    dir_to_del = ["%06d" % num for num in to_del]
    for d in dir_to_del:
        run.Delete(osp.join(run['rep_rex'], "emise", d))
        run.Mess(_(u"files of issue %s have been deleted.") % d)

def _read_table(run, cnx, typ):
    """return status and product tables"""
    try:
        tab = ReadDB(typ, cnx)
    except ReadDBError:
        run.Mess(_(u'Unable to read the table'), '<F>_DB_ERROR')
    return tab

def mark_as_closed(run, l_nf, cnx, tagv, expl):
    """Mark a list of issues as closed
    l_nf: list of issues numbers
    cnx: connection object to database
    tagv: tag inserted in issues
    expl: True if it concerns a stable version (exploitation)"""
    etat = _read_table(run, cnx, STATUS)
    prod = _read_table(run, cnx, PRODUIT)
    typv = 'dev'
    autr = 'expl'
    if expl:
        typv = 'expl'
        autr = 'dev'
    d_champ = {
        'a_corrige' : '_corrV' + typv,
        'v_correct' : '_verCorrV' + typv,
        'a_tester'  : '_corrV' + autr,
        'v_tester'  : '_verCorrV' + autr,
    }
    req_status = 'valide_EDA'
    for numf in l_nf:
        # 4.1. read the issue
        try:
            issue = ISSUE(numf, cnx)
        except ReadDBError:
            run.Mess(_(u'Unable to read issue %s') % numf, '<E>_UNKNOWN_ISSUE')
            continue

        # 4.2. check issue values
        status = issue['_status'].GetPrimValue()
        if status != req_status:
            run.Mess(_(u"Status of issue %s is not '%s' (%s)") \
                    % (numf, req_status, status), '<A>_UNEXPECTED_VALUE')
            issue['_status'] = etat[req_status]

        if issue['_produit'].GetPrimValue() == 'Code_Aster':
            if not issue[d_champ['a_corrige']] \
                and issue['_type'].GetPrimValue() != 'aide utilisation':
                issue[d_champ['a_corrige']] = YES
                run.Mess(_(u"issue %s should not been solved in version %s") \
                        % (numf, typv), '<A>_UNEXPECTED_VALUE')
        else:
            issue[d_champ['a_corrige']] = NO
            issue[d_champ['a_tester']] = NO

        # 4.3. fill issue fields
        issue[d_champ['v_correct']] = tagv

        # 4.4. close issue ?
        if not issue[d_champ['a_tester']] or not issue[d_champ['v_tester']] in ('', None):
            new_status = etat['attente_doc']
            if not issue['_impactDoc']:
                new_status = etat['ferme']
            issue['_status'] = new_status
            run.Mess(_(u'issue %s is closed') % numf)
        else:
            run.Mess(_(u'issue %s must be solved in V%s too') % (numf, autr))

        # 4.5. write issue in database
        try:
            if not run['debug']:
                issue.write(force=True)
            else:
                issue.repr()
        except WriteDBError, msg:
            run.Mess(_(u'error occurs during writing issue'), '<F>_DB_ERROR')


def build_histor(run, l_nf, cnx):
    """Build an Histor instance from a list of issues
    l_nf: list of issues numbers
    cnx: connection object to database"""
    # 4. read issues from database
    prec_user = None
    histor = InitHistor(format=run['format'], url=run['rex_url'])
    for numf in l_nf:
        # 4.1. read the issue
        issue = ISSUE({'id' : numf}, cnx)
        try:
            issue.read()
        except ReadDBError:
            run.Mess(_(u'Unable to read issue %s') % numf, '<E>_UNKNOWN_ISSUE')
            continue

        # 4.2.
        # 4.2.1. check status
        status = issue['_status'].GetPrimValue()
        if status != run['status'] and run['status'] != 'all':
            run.Mess(_(u"Status of issue %s is not '%s' (%s)") \
                    % (numf, run['status'], status), '<E>_UNEXPECTED_VALUE')

        # 4.2.2. alarm on impactDoc : no

        # 4.3.1. user header
        if numf == l_nf[0] or prec_user == None:
            prec_user = issue['_assignedto']
            histor.BeginUser(prec_user)

        if issue['_assignedto'] != None and prec_user != None \
                and issue['_assignedto']['id'] != prec_user['id']:
            if prec_user != None:
                histor.EndUser(prec_user)
            prec_user = issue['_assignedto']
            histor.BeginUser(prec_user)

        # 4.3.2. get message file of answer
        l_rep = []
        d_msg = issue.GetLinks()
        # allow to print all messages
        if run['all_msg']:
            l_msgid = d_msg.keys()
            l_msgid.sort()
            l_msgid.reverse()
        else:
            l_msgid = [max(d_msg.keys())]
        for i, msgid in enumerate(l_msgid):
            repid = str(int(msgid) / 1000)
            msgfile = '%s%smsg%s' % (repid, os.sep, msgid)
            fmsg = osp.join(run['rep_tmp'], 'msg%s' % msgid)
            cmd = []
            cmd.append(osp.join(run['rep_agla'], 'roundup_cp_uid'))
            cmd.append('get')
            cmd.append('%s%smsg%s' % (repid, os.sep, msgid))
            cmd.append(run['rep_tmp'])
            iret, output = run.Shell(' '.join(cmd))
            if iret != 0 or not osp.exists(fmsg):
                run.Mess(_(u'Error message: %s') % output)
                run.Mess(_(u'Error occurs during copying message file %s, ' \
                       'only the summary will be printed') % msgid, '<F>_COPY_MSG')
            else:
                txt = open(fmsg, 'r').read()
                if len(l_msgid) > 1:
                    l_rep.append(_(u'message %d :') % (len(l_msgid) - i))
                l_rep.append(txt)

        # 4.3.3. add this issue to the histor object
        histor.AddIssue(issue, l_rep)

        # 4.3.4. last issue
        if numf == l_nf[-1] and prec_user != None:
            histor.EndUser(prec_user)
    return histor