This file is indexed.

/usr/lib/python3/dist-packages/lib389/changelog.py is in python3-lib389 1.3.7.10-1ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# --- BEGIN COPYRIGHT BLOCK ---
# Copyright (C) 2015 Red Hat, Inc.
# All rights reserved.
#
# License: GPL (version 3 or any later version).
# See LICENSE for details.
# --- END COPYRIGHT BLOCK ---

import os
import ldap

from lib389._constants import *
from lib389.properties import *
from lib389 import DirSrv, Entry, InvalidArgumentError


class Changelog(object):
    """An object that helps to work with changelog entry

    :param conn: An instance
    :type conn: lib389.DirSrv
    """

    proxied_methods = 'search_s getEntry'.split()

    def __init__(self, conn):
        self.conn = conn
        self.log = conn.log

    def __getattr__(self, name):
        if name in Changelog.proxied_methods:
            return DirSrv.__getattr__(self.conn, name)

    def list(self, suffix=None, changelogdn=DN_CHANGELOG):
        """Get a changelog entry using changelogdn parameter

        :param suffix: Not used
        :type suffix: str
        :param changelogdn: DN of the changelog entry, DN_CHANGELOG by default
        :type changelogdn: str

        :returns: Search result of the replica agreements.
                  Enpty list if nothing was found
        """

        base = changelogdn
        filtr = "(objectclass=extensibleobject)"

        # now do the effective search
        try:
            ents = self.conn.search_s(base, ldap.SCOPE_BASE, filtr)
        except ldap.NO_SUCH_OBJECT:
            # There are no objects to select from, se we return an empty array
            # as we do in DSLdapObjects
            ents = []
        return ents

    def create(self, dbname=DEFAULT_CHANGELOG_DB):
        """Add and return the replication changelog entry.

        :param dbname: Database name, it will be used for creating
                       a changelog dir path
        :type dbname: str
        """

        dn = DN_CHANGELOG
        attribute, changelog_name = dn.split(",")[0].split("=", 1)
        dirpath = os.path.join(os.path.dirname(self.conn.dbdir), dbname)
        entry = Entry(dn)
        entry.update({
            'objectclass': ("top", "extensibleobject"),
            CHANGELOG_PROPNAME_TO_ATTRNAME[CHANGELOG_NAME]: changelog_name,
            CHANGELOG_PROPNAME_TO_ATTRNAME[CHANGELOG_DIR]: dirpath
        })
        self.log.debug("adding changelog entry: %r" % entry)
        self.conn.changelogdir = dirpath
        try:
            self.conn.add_s(entry)
        except ldap.ALREADY_EXISTS:
            self.log.warn("entry %s already exists" % dn)
        return dn

    def delete(self):
        """Delete the changelog entry

        :raises: LDAPError - failed to delete changelog entry
        """

        try:
            self.conn.delete_s(DN_CHANGELOG)
        except ldap.LDAPError as e:
            self.log.error('Failed to delete the changelog: ' + str(e))
            raise

    def setProperties(self, changelogdn=None, properties=None):
        """Set the properties of the changelog entry.

        :param changelogdn: DN of the changelog
        :type changelogdn: str
        :param properties: Dictionary of properties
        :type properties: dict

        :returns: None
        :raises: - ValueError - if invalid properties
                 - ValueError - if changelog entry is not found
                 - InvalidArgumentError - changelog DN is missing

        :supported properties are:
                CHANGELOG_NAME, CHANGELOG_DIR, CHANGELOG_MAXAGE,
                CHANGELOG_MAXENTRIES, CHANGELOG_TRIM_INTERVAL,
                CHANGELOG_COMPACT_INTV, CHANGELOG_CONCURRENT_WRITES,
                CHANGELOG_ENCRYPT_ALG, CHANGELOG_SYM_KEY
        """

        if not changelogdn:
            raise InvalidArgumentError("changelog DN is missing")

        ents = self.conn.changelog.list(changelogdn=changelogdn)
        if len(ents) != 1:
            raise ValueError("Changelog entry not found: %s" % changelogdn)

        # check that the given properties are valid
        for prop in properties:
            # skip the prefix to add/del value
            if not inProperties(prop, CHANGELOG_PROPNAME_TO_ATTRNAME):
                raise ValueError("unknown property: %s" % prop)

        # build the MODS
        mods = []
        for prop in properties:
            # take the operation type from the property name
            val = rawProperty(prop)
            if str(prop).startswith('+'):
                op = ldap.MOD_ADD
            elif str(prop).startswith('-'):
                op = ldap.MOD_DELETE
            else:
                op = ldap.MOD_REPLACE

            mods.append((op, CHANGELOG_PROPNAME_TO_ATTRNAME[val],
                         properties[prop]))

        # that is fine now to apply the MOD
        self.conn.modify_s(ents[0].dn, mods)

    def getProperties(self, changelogdn=None, properties=None):
        """Get a dictionary of the requested properties.
        If properties parameter is missing, it returns all the properties.

        NotImplemented
        """

        raise NotImplemented