This file is indexed.

/usr/lib/python3/dist-packages/sqlalchemy/orm/sync.py is in python3-sqlalchemy 1.0.15+ds1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# orm/sync.py
# Copyright (C) 2005-2016 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php

"""private module containing functions used for copying data
between instances based on join conditions.

"""

from . import exc, util as orm_util, attributes


def populate(source, source_mapper, dest, dest_mapper,
             synchronize_pairs, uowcommit, flag_cascaded_pks):
    source_dict = source.dict
    dest_dict = dest.dict

    for l, r in synchronize_pairs:
        try:
            # inline of source_mapper._get_state_attr_by_column
            prop = source_mapper._columntoproperty[l]
            value = source.manager[prop.key].impl.get(source, source_dict,
                                                      attributes.PASSIVE_OFF)
        except exc.UnmappedColumnError:
            _raise_col_to_prop(False, source_mapper, l, dest_mapper, r)

        try:
            # inline of dest_mapper._set_state_attr_by_column
            prop = dest_mapper._columntoproperty[r]
            dest.manager[prop.key].impl.set(dest, dest_dict, value, None)
        except exc.UnmappedColumnError:
            _raise_col_to_prop(True, source_mapper, l, dest_mapper, r)

        # technically the "r.primary_key" check isn't
        # needed here, but we check for this condition to limit
        # how often this logic is invoked for memory/performance
        # reasons, since we only need this info for a primary key
        # destination.
        if flag_cascaded_pks and l.primary_key and \
                r.primary_key and \
                r.references(l):
            uowcommit.attributes[("pk_cascaded", dest, r)] = True


def bulk_populate_inherit_keys(
        source_dict, source_mapper, synchronize_pairs):
    # a simplified version of populate() used by bulk insert mode
    for l, r in synchronize_pairs:
        try:
            prop = source_mapper._columntoproperty[l]
            value = source_dict[prop.key]
        except exc.UnmappedColumnError:
            _raise_col_to_prop(False, source_mapper, l, source_mapper, r)

        try:
            prop = source_mapper._columntoproperty[r]
            source_dict[prop.key] = value
        except exc.UnmappedColumnError:
            _raise_col_to_prop(True, source_mapper, l, source_mapper, r)


def clear(dest, dest_mapper, synchronize_pairs):
    for l, r in synchronize_pairs:
        if r.primary_key and \
            dest_mapper._get_state_attr_by_column(
                dest, dest.dict, r) not in orm_util._none_set:

            raise AssertionError(
                "Dependency rule tried to blank-out primary key "
                "column '%s' on instance '%s'" %
                (r, orm_util.state_str(dest))
            )
        try:
            dest_mapper._set_state_attr_by_column(dest, dest.dict, r, None)
        except exc.UnmappedColumnError:
            _raise_col_to_prop(True, None, l, dest_mapper, r)


def update(source, source_mapper, dest, old_prefix, synchronize_pairs):
    for l, r in synchronize_pairs:
        try:
            oldvalue = source_mapper._get_committed_attr_by_column(
                source.obj(), l)
            value = source_mapper._get_state_attr_by_column(
                source, source.dict, l, passive=attributes.PASSIVE_OFF)
        except exc.UnmappedColumnError:
            _raise_col_to_prop(False, source_mapper, l, None, r)
        dest[r.key] = value
        dest[old_prefix + r.key] = oldvalue


def populate_dict(source, source_mapper, dict_, synchronize_pairs):
    for l, r in synchronize_pairs:
        try:
            value = source_mapper._get_state_attr_by_column(
                source, source.dict, l, passive=attributes.PASSIVE_OFF)
        except exc.UnmappedColumnError:
            _raise_col_to_prop(False, source_mapper, l, None, r)

        dict_[r.key] = value


def source_modified(uowcommit, source, source_mapper, synchronize_pairs):
    """return true if the source object has changes from an old to a
    new value on the given synchronize pairs

    """
    for l, r in synchronize_pairs:
        try:
            prop = source_mapper._columntoproperty[l]
        except exc.UnmappedColumnError:
            _raise_col_to_prop(False, source_mapper, l, None, r)
        history = uowcommit.get_attribute_history(
            source, prop.key, attributes.PASSIVE_NO_INITIALIZE)
        if bool(history.deleted):
            return True
    else:
        return False


def _raise_col_to_prop(isdest, source_mapper, source_column,
                       dest_mapper, dest_column):
    if isdest:
        raise exc.UnmappedColumnError(
            "Can't execute sync rule for "
            "destination column '%s'; mapper '%s' does not map "
            "this column.  Try using an explicit `foreign_keys` "
            "collection which does not include this column (or use "
            "a viewonly=True relation)." % (dest_column, dest_mapper))
    else:
        raise exc.UnmappedColumnError(
            "Can't execute sync rule for "
            "source column '%s'; mapper '%s' does not map this "
            "column.  Try using an explicit `foreign_keys` "
            "collection which does not include destination column "
            "'%s' (or use a viewonly=True relation)." %
            (source_column, source_mapper, dest_column))