This file is indexed.

/usr/lib/python3/dist-packages/sqlobject/views.py is in python3-sqlobject 3.1.0+dfsg-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from .main import SQLObject
from .sqlbuilder import AND, Alias, ColumnAS, LEFTJOINOn, \
    NoDefault, SQLCall, SQLConstant, SQLObjectField, SQLObjectTable, SQLOp, \
    Select, sqlrepr


class ViewSQLObjectField(SQLObjectField):
    def __init__(self, alias, *arg):
        SQLObjectField.__init__(self, *arg)
        self.alias = alias

    def __sqlrepr__(self, db):
        return self.alias + "." + self.fieldName

    def tablesUsedImmediate(self):
        return [self.tableName]


class ViewSQLObjectTable(SQLObjectTable):
    FieldClass = ViewSQLObjectField

    def __getattr__(self, attr):
        if attr == 'sqlmeta':
            raise AttributeError
        return SQLObjectTable.__getattr__(self, attr)

    def _getattrFromID(self, attr):
        return self.FieldClass(self.soClass.sqlmeta.alias, self.tableName,
                               'id', attr, self.soClass, None)

    def _getattrFromColumn(self, column, attr):
        return self.FieldClass(self.soClass.sqlmeta.alias, self.tableName,
                               column.name, attr, self.soClass, column)


class ViewSQLObject(SQLObject):
    """
    A SQLObject class that derives all it's values from other SQLObject
    classes. Columns on subclasses should use SQLBuilder constructs for dbName,
    and sqlmeta should specify:

    * idName as a SQLBuilder construction
    * clause as SQLBuilder clause for specifying join conditions
      or other restrictions
    * table as an optional alternate name for the class alias

    See test_views.py for simple examples.
    """

    def __classinit__(cls, new_attrs):
        SQLObject.__classinit__(cls, new_attrs)
        # like is_base
        if cls.__name__ != 'ViewSQLObject':
            dbName = hasattr(cls, '_connection') and \
                (cls._connection and cls._connection.dbName) or None

            if getattr(cls.sqlmeta, 'table', None):
                cls.sqlmeta.alias = cls.sqlmeta.table
            else:
                cls.sqlmeta.alias = \
                    cls.sqlmeta.style.pythonClassToDBTable(cls.__name__)
            alias = cls.sqlmeta.alias
            columns = [ColumnAS(cls.sqlmeta.idName, 'id')]
            # {sqlrepr-key: [restriction, *aggregate-column]}
            aggregates = {'': [None]}
            inverseColumns = dict(
                [(y, x) for x, y in cls.sqlmeta.columns.items()])
            for col in cls.sqlmeta.columnList:
                n = inverseColumns[col]
                ascol = ColumnAS(col.dbName, n)
                if isAggregate(col.dbName):
                    restriction = getattr(col, 'aggregateClause', None)
                    if restriction:
                        restrictkey = sqlrepr(restriction, dbName)
                        aggregates[restrictkey] = \
                            aggregates.get(restrictkey, [restriction]) + \
                            [ascol]
                    else:
                        aggregates[''].append(ascol)
                else:
                    columns.append(ascol)

            metajoin = getattr(cls.sqlmeta, 'join', NoDefault)
            clause = getattr(cls.sqlmeta, 'clause', NoDefault)
            select = Select(columns,
                            distinct=True,
                            # @@ LDO check if this really mattered
                            # for performance
                            # @@ Postgres (and MySQL?) extension!
                            # distinctOn=cls.sqlmeta.idName,
                            join=metajoin,
                            clause=clause)

            aggregates = aggregates.values()

            if aggregates != [[None]]:
                join = []
                last_alias = "%s_base" % alias
                last_id = "id"
                last = Alias(select, last_alias)
                columns = [
                    ColumnAS(SQLConstant("%s.%s" % (last_alias, x.expr2)),
                             x.expr2) for x in columns]

                for i, agg in enumerate(aggregates):
                    restriction = agg[0]
                    if restriction is None:
                        restriction = clause
                    else:
                        restriction = AND(clause, restriction)
                    agg = agg[1:]
                    agg_alias = "%s_%s" % (alias, i)
                    agg_id = '%s_id' % agg_alias
                    if not last.q.alias.endswith('base'):
                        last = None
                    new_alias = Alias(Select(
                        [ColumnAS(cls.sqlmeta.idName, agg_id)] + agg,
                        groupBy=cls.sqlmeta.idName,
                        join=metajoin,
                        clause=restriction),
                        agg_alias)
                    agg_join = LEFTJOINOn(last, new_alias,
                                          "%s.%s = %s.%s" % (
                                              last_alias, last_id,
                                              agg_alias, agg_id))

                    join.append(agg_join)
                    for col in agg:
                        columns.append(
                            ColumnAS(SQLConstant(
                                "%s.%s" % (agg_alias, col.expr2)),
                                col.expr2))

                    last = new_alias
                    last_alias = agg_alias
                    last_id = agg_id
                select = Select(columns,
                                join=join)

            cls.sqlmeta.table = Alias(select, alias)
            cls.q = ViewSQLObjectTable(cls)
            for n, col in cls.sqlmeta.columns.items():
                col.dbName = n


def isAggregate(expr):
    if isinstance(expr, SQLCall):
        return True
    if isinstance(expr, SQLOp):
        return isAggregate(expr.expr1) or isAggregate(expr.expr2)
    return False