This file is indexed.

/usr/lib/python3/dist-packages/common/utils.py is in ctdconverter 2.0-4.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
#!/usr/bin/env python
# encoding: utf-8
import ntpath
import os

from lxml import etree
from .logger import info, error, warning

from common.exceptions import ApplicationException
from CTDopts.CTDopts import CTDModel, ParameterGroup


MESSAGE_INDENTATION_INCREMENT = 2


# simple struct-class containing a tuple with input/output location and the in-memory CTDModel
class ParsedCTD:
    def __init__(self, ctd_model=None, input_file=None, suggested_output_file=None):
        self.ctd_model = ctd_model
        self.input_file = input_file
        self.suggested_output_file = suggested_output_file


class ParameterHardcoder:
    def __init__(self):
        # map whose keys are the composite names of tools and parameters in the following pattern:
        # [ToolName][separator][ParameterName] -> HardcodedValue
        # if the parameter applies to all tools, then the following pattern is used:
        # [ParameterName] -> HardcodedValue

        # examples (assuming separator is '#'):
        # threads -> 24
        # XtandemAdapter#adapter -> xtandem.exe
        # adapter -> adapter.exe
        self.separator = "!"
        self.parameter_map = {}

    # the most specific value will be returned in case of overlap
    def get_hardcoded_value(self, parameter_name, tool_name):
        # look for the value that would apply for all tools
        generic_value = self.parameter_map.get(parameter_name, None)
        specific_value = self.parameter_map.get(self.build_key(parameter_name, tool_name), None)
        if specific_value is not None:
            return specific_value

        return generic_value

    def register_parameter(self, parameter_name, parameter_value, tool_name=None):
        self.parameter_map[self.build_key(parameter_name, tool_name)] = parameter_value

    def build_key(self, parameter_name, tool_name):
        if tool_name is None:
            return parameter_name
        return "%s%s%s" % (parameter_name, self.separator, tool_name)


def validate_path_exists(path):
    if not os.path.isfile(path) or not os.path.exists(path):
        raise ApplicationException("The provided path (%s) does not exist or is not a valid file path." % path)


def validate_argument_is_directory(args, argument_name):
    file_name = getattr(args, argument_name)
    if file_name is not None and os.path.isdir(file_name):
        raise ApplicationException("The provided output file name (%s) points to a directory." % file_name)


def validate_argument_is_valid_path(args, argument_name):
    paths_to_check = []
    # check if we are handling a single file or a list of files
    member_value = getattr(args, argument_name)
    if member_value is not None:
        if isinstance(member_value, list):
            for file_name in member_value:
                paths_to_check.append(str(file_name).strip())
        else:
            paths_to_check.append(str(member_value).strip())

        for path_to_check in paths_to_check:
            validate_path_exists(path_to_check)


# taken from
# http://stackoverflow.com/questions/8384737/python-extract-file-name-from-path-no-matter-what-the-os-path-format
def get_filename(path):
    head, tail = ntpath.split(path)
    return tail or ntpath.basename(head)


def get_filename_without_suffix(path):
    root, ext = os.path.splitext(os.path.basename(path))
    return root


def parse_input_ctds(xsd_location, input_ctds, output_destination, output_file_extension):
    is_converting_multiple_ctds = len(input_ctds) > 1
    parsed_ctds = []
    schema = None
    if xsd_location is not None:
        try:
            info("Loading validation schema from %s" % xsd_location, 0)
            schema = etree.XMLSchema(etree.parse(xsd_location))
        except Exception as e:
            error("Could not load validation schema %s. Reason: %s" % (xsd_location, str(e)), 0)
    else:
        warning("Validation against a schema has not been enabled.", 0)

    for input_ctd in input_ctds:
        if schema is not None:
            validate_against_schema(input_ctd, schema)

        output_file = output_destination
        # if multiple inputs are being converted, we need to generate a different output_file for each input
        if is_converting_multiple_ctds:
            output_file = os.path.join(output_file, get_filename_without_suffix(input_ctd) + "." + output_file_extension)
        info("Parsing %s" % input_ctd)
        parsed_ctds.append(ParsedCTD(CTDModel(from_file=input_ctd), input_ctd, output_file))

    return parsed_ctds


def flatten_list_of_lists(args, list_name):
    setattr(args, list_name, [item for sub_list in getattr(args, list_name) for item in sub_list])


def validate_against_schema(ctd_file, schema):
    try:
        parser = etree.XMLParser(schema=schema)
        etree.parse(ctd_file, parser=parser)
    except etree.XMLSyntaxError as e:
        raise ApplicationException("Invalid CTD file %s. Reason: %s" % (ctd_file, str(e)))


def add_common_parameters(parser, version, last_updated):
    parser.add_argument("FORMAT", default=None, help="Output format (mandatory). Can be one of: cwl, galaxy.")
    parser.add_argument("-i", "--input", dest="input_files", default=[], required=True, nargs="+", action="append",
                        help="List of CTD files to convert.")
    parser.add_argument("-o", "--output-destination", dest="output_destination", required=True,
                        help="If multiple input files are given, then a folder in which all converted "
                             "files will be generated is expected; "
                             "if a single input file is given, then a destination file is expected.")
    parser.add_argument("-x", "--default-executable-path", dest="default_executable_path",
                        help="Use this executable path when <executablePath> is not present in the CTD",
                        default=None, required=False)
    parser.add_argument("-b", "--blacklist-parameters", dest="blacklisted_parameters", default=[], nargs="+",
                        action="append",
                        help="List of parameters that will be ignored and won't appear on the galaxy stub",
                        required=False)
    parser.add_argument("-p", "--hardcoded-parameters", dest="hardcoded_parameters", default=None, required=False,
                        help="File containing hardcoded values for the given parameters. Run with '-h' or '--help' "
                             "to see a brief example on the format of this file.")
    parser.add_argument("-V", "--validation-schema", dest="xsd_location", default=None, required=False,
                        help="Location of the schema to use to validate CTDs. If not provided, no schema validation "
                             "will take place.")

    # TODO: add verbosity, maybe?
    program_version = "v%s" % version
    program_build_date = str(last_updated)
    program_version_message = "%%(prog)s %s (%s)" % (program_version, program_build_date)
    parser.add_argument("-v", "--version", action="version", version=program_version_message)


def parse_hardcoded_parameters(hardcoded_parameters_file):
    parameter_hardcoder = ParameterHardcoder()
    if hardcoded_parameters_file is not None:
        line_number = 0
        with open(hardcoded_parameters_file) as f:
            for line in f:
                line_number += 1
                if line is None or not line.strip() or line.strip().startswith("#"):
                    pass
                else:
                    # the third column must not be obtained as a whole, and not split
                    parsed_hardcoded_parameter = line.strip().split(None, 2)
                    # valid lines contain two or three columns
                    if len(parsed_hardcoded_parameter) != 2 and len(parsed_hardcoded_parameter) != 3:
                        warning("Invalid line at line number %d of the given hardcoded parameters file. Line will be"
                                "ignored:\n%s" % (line_number, line), 0)
                        continue

                    parameter_name = parsed_hardcoded_parameter[0]
                    hardcoded_value = parsed_hardcoded_parameter[1]
                    tool_names = None
                    if len(parsed_hardcoded_parameter) == 3:
                        tool_names = parsed_hardcoded_parameter[2].split(',')
                    if tool_names:
                        for tool_name in tool_names:
                            parameter_hardcoder.register_parameter(parameter_name, hardcoded_value, tool_name.strip())
                    else:
                        parameter_hardcoder.register_parameter(parameter_name, hardcoded_value)

    return parameter_hardcoder


def extract_tool_help_text(ctd_model):
    manual = ""
    doc_url = None
    if "manual" in list(ctd_model.opt_attribs.keys()):
        manual += "%s\n\n" % ctd_model.opt_attribs["manual"]
    if "docurl" in list(ctd_model.opt_attribs.keys()):
        doc_url = ctd_model.opt_attribs["docurl"]

    help_text = "No help available"
    if manual is not None:
        help_text = manual
    if doc_url is not None:
        help_text = ("" if manual is None else manual) + "\nFor more information, visit %s" % doc_url

    return help_text


def extract_tool_executable_path(model, default_executable_path):
    # rules to build the executable path:
    # if executablePath is null, then use default_executable_path
    # if executablePath is null and executableName is null, then the name of the tool will be used
    # if executablePath is null and executableName is not null, then executableName will be used
    # if executablePath is not null and executableName is null,
    #   then executablePath and the name of the tool will be used
    # if executablePath is not null and executableName is not null, then both will be used

    # first, check if the model has executablePath / executableName defined
    executable_path = model.opt_attribs.get("executablePath", None)
    executable_name = model.opt_attribs.get("executableName", None)

    # check if we need to use the default_executable_path
    if executable_path is None:
        executable_path = default_executable_path

    # fix the executablePath to make sure that there is a '/' in the end
    if executable_path is not None:
        executable_path = executable_path.strip()
        if not executable_path.endswith("/"):
            executable_path += "/"

    # assume that we have all information present
    command = str(executable_path) + str(executable_name)
    if executable_path is None:
        if executable_name is None:
            command = model.name
        else:
            command = executable_name
    else:
        if executable_name is None:
            command = executable_path + model.name
    return command


def extract_and_flatten_parameters(ctd_model):
    parameters = []
    if len(ctd_model.parameters.parameters) > 0:
        # use this to put parameters that are to be processed
        # we know that CTDModel has one parent ParameterGroup
        pending = [ctd_model.parameters]
        while len(pending) > 0:
            # take one element from 'pending'
            parameter = pending.pop()
            if type(parameter) is not ParameterGroup:
                parameters.append(parameter)
            else:
                # append the first-level children of this ParameterGroup
                pending.extend(list(parameter.parameters.values()))
    # returned the reversed list of parameters (as it is now,
    # we have the last parameter in the CTD as first in the list)
    return reversed(parameters)


# some parameters are mapped to command line options, this method helps resolve those mappings, if any
def resolve_param_mapping(param, ctd_model):
    # go through all mappings and find if the given param appears as a reference name in a mapping element
    param_mapping = None
    for cli_element in ctd_model.cli:
        for mapping_element in cli_element.mappings:
            if mapping_element.reference_name == param.name:
                if param_mapping is not None:
                    warning("The parameter %s has more than one mapping in the <cli> section. "
                            "The first found mapping, %s, will be used." % (param.name, param_mapping), 1)
                else:
                    param_mapping = cli_element.option_identifier

    return param_mapping if param_mapping is not None else param.name


def _extract_param_cli_name(param, ctd_model):
    # we generate parameters with colons for subgroups, but not for the two topmost parents (OpenMS legacy)
    if type(param.parent) == ParameterGroup:
        if not hasattr(param.parent.parent, 'parent'):
            return resolve_param_mapping(param, ctd_model)
        elif not hasattr(param.parent.parent.parent, 'parent'):
            return resolve_param_mapping(param, ctd_model)
        else:
            if ctd_model.cli:
                warning("Using nested parameter sections (NODE elements) is not compatible with <cli>", 1)
            return extract_param_name(param.parent) + ":" + resolve_param_mapping(param, ctd_model)
    else:
        return resolve_param_mapping(param, ctd_model)


def extract_param_name(param):
    # we generate parameters with colons for subgroups, but not for the two topmost parents (OpenMS legacy)
    if type(param.parent) == ParameterGroup:
        if not hasattr(param.parent.parent, "parent"):
            return param.name
        elif not hasattr(param.parent.parent.parent, "parent"):
            return param.name
        else:
            return extract_param_name(param.parent) + ":" + param.name
    else:
        return param.name


def extract_command_line_prefix(param, ctd_model):
    param_name = extract_param_name(param)
    param_cli_name = _extract_param_cli_name(param, ctd_model)
    if param_name == param_cli_name:
        # there was no mapping, so for the cli name we will use a '-' in the prefix
        param_cli_name = "-" + param_name
    return param_cli_name