This file is indexed.

/usr/lib/python3/dist-packages/certbot/plugins/disco.py is in python3-certbot 0.23.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
"""Utilities for plugins discovery and selection."""
import collections
import itertools
import logging
import pkg_resources
import six

from collections import OrderedDict

import zope.interface
import zope.interface.verify

from certbot import constants
from certbot import errors
from certbot import interfaces


logger = logging.getLogger(__name__)


class PluginEntryPoint(object):
    """Plugin entry point."""

    PREFIX_FREE_DISTRIBUTIONS = [
        "certbot",
        "certbot-apache",
        "certbot-dns-cloudflare",
        "certbot-dns-cloudxns",
        "certbot-dns-digitalocean",
        "certbot-dns-dnsimple",
        "certbot-dns-dnsmadeeasy",
        "certbot-dns-google",
        "certbot-dns-luadns",
        "certbot-dns-nsone",
        "certbot-dns-rfc2136",
        "certbot-dns-route53",
        "certbot-nginx",
    ]
    """Distributions for which prefix will be omitted."""

    # this object is mutable, don't allow it to be hashed!
    __hash__ = None  # type: ignore

    def __init__(self, entry_point):
        self.name = self.entry_point_to_plugin_name(entry_point)
        self.plugin_cls = entry_point.load()
        self.entry_point = entry_point
        self._initialized = None
        self._prepared = None

    @classmethod
    def entry_point_to_plugin_name(cls, entry_point):
        """Unique plugin name for an ``entry_point``"""
        if entry_point.dist.key in cls.PREFIX_FREE_DISTRIBUTIONS:
            return entry_point.name
        return entry_point.dist.key + ":" + entry_point.name

    @property
    def description(self):
        """Description of the plugin."""
        return self.plugin_cls.description

    @property
    def description_with_name(self):
        """Description with name. Handy for UI."""
        return "{0} ({1})".format(self.description, self.name)

    @property
    def long_description(self):
        """Long description of the plugin."""
        try:
            return self.plugin_cls.long_description
        except AttributeError:
            return self.description

    @property
    def hidden(self):
        """Should this plugin be hidden from UI?"""
        return getattr(self.plugin_cls, "hidden", False)

    def ifaces(self, *ifaces_groups):
        """Does plugin implements specified interface groups?"""
        return not ifaces_groups or any(
            all(iface.implementedBy(self.plugin_cls)
                for iface in ifaces)
            for ifaces in ifaces_groups)

    @property
    def initialized(self):
        """Has the plugin been initialized already?"""
        return self._initialized is not None

    def init(self, config=None):
        """Memoized plugin initialization."""
        if not self.initialized:
            self.entry_point.require()  # fetch extras!
            self._initialized = self.plugin_cls(config, self.name)
        return self._initialized

    def verify(self, ifaces):
        """Verify that the plugin conforms to the specified interfaces."""
        assert self.initialized
        for iface in ifaces:  # zope.interface.providedBy(plugin)
            try:
                zope.interface.verify.verifyObject(iface, self.init())
            except zope.interface.exceptions.BrokenImplementation as error:
                if iface.implementedBy(self.plugin_cls):
                    logger.debug(
                        "%s implements %s but object does not verify: %s",
                        self.plugin_cls, iface.__name__, error, exc_info=True)
                return False
        return True

    @property
    def prepared(self):
        """Has the plugin been prepared already?"""
        if not self.initialized:
            logger.debug(".prepared called on uninitialized %r", self)
        return self._prepared is not None

    def prepare(self):
        """Memoized plugin preparation."""
        assert self.initialized
        if self._prepared is None:
            try:
                self._initialized.prepare()
            except errors.MisconfigurationError as error:
                logger.debug("Misconfigured %r: %s", self, error, exc_info=True)
                self._prepared = error
            except errors.NoInstallationError as error:
                logger.debug(
                    "No installation (%r): %s", self, error, exc_info=True)
                self._prepared = error
            except errors.PluginError as error:
                logger.debug("Other error:(%r): %s", self, error, exc_info=True)
                self._prepared = error
            else:
                self._prepared = True
        return self._prepared

    @property
    def misconfigured(self):
        """Is plugin misconfigured?"""
        return isinstance(self._prepared, errors.MisconfigurationError)

    @property
    def problem(self):
        """Return the Exception raised during plugin setup, or None if all is well"""
        if isinstance(self._prepared, Exception):
            return self._prepared
        return None

    @property
    def available(self):
        """Is plugin available, i.e. prepared or misconfigured?"""
        return self._prepared is True or self.misconfigured

    def __repr__(self):
        return "PluginEntryPoint#{0}".format(self.name)

    def __str__(self):
        lines = [
            "* {0}".format(self.name),
            "Description: {0}".format(self.plugin_cls.description),
            "Interfaces: {0}".format(", ".join(
                iface.__name__ for iface in zope.interface.implementedBy(
                    self.plugin_cls))),
            "Entry point: {0}".format(self.entry_point),
        ]

        if self.initialized:
            lines.append("Initialized: {0}".format(self.init()))
            if self.prepared:
                lines.append("Prep: {0}".format(self.prepare()))

        return "\n".join(lines)


class PluginsRegistry(collections.Mapping):
    """Plugins registry."""

    def __init__(self, plugins):
        # plugins are sorted so the same order is used between runs.
        # This prevents deadlock caused by plugins acquiring a lock
        # and ensures at least one concurrent Certbot instance will run
        # successfully.
        self._plugins = OrderedDict(sorted(six.iteritems(plugins)))

    @classmethod
    def find_all(cls):
        """Find plugins using setuptools entry points."""
        plugins = {}
        # pylint: disable=not-callable
        entry_points = itertools.chain(
            pkg_resources.iter_entry_points(
                constants.SETUPTOOLS_PLUGINS_ENTRY_POINT),
            pkg_resources.iter_entry_points(
                constants.OLD_SETUPTOOLS_PLUGINS_ENTRY_POINT),)
        for entry_point in entry_points:
            plugin_ep = PluginEntryPoint(entry_point)
            assert plugin_ep.name not in plugins, (
                "PREFIX_FREE_DISTRIBUTIONS messed up")
            # providedBy | pylint: disable=no-member
            if interfaces.IPluginFactory.providedBy(plugin_ep.plugin_cls):
                plugins[plugin_ep.name] = plugin_ep
            else:  # pragma: no cover
                logger.warning(
                    "%r does not provide IPluginFactory, skipping", plugin_ep)
        return cls(plugins)

    def __getitem__(self, name):
        return self._plugins[name]

    def __iter__(self):
        return iter(self._plugins)

    def __len__(self):
        return len(self._plugins)

    def init(self, config):
        """Initialize all plugins in the registry."""
        return [plugin_ep.init(config) for plugin_ep
                in six.itervalues(self._plugins)]

    def filter(self, pred):
        """Filter plugins based on predicate."""
        return type(self)(dict((name, plugin_ep) for name, plugin_ep
                               in six.iteritems(self._plugins) if pred(plugin_ep)))

    def visible(self):
        """Filter plugins based on visibility."""
        return self.filter(lambda plugin_ep: not plugin_ep.hidden)

    def ifaces(self, *ifaces_groups):
        """Filter plugins based on interfaces."""
        # pylint: disable=star-args
        return self.filter(lambda p_ep: p_ep.ifaces(*ifaces_groups))

    def verify(self, ifaces):
        """Filter plugins based on verification."""
        return self.filter(lambda p_ep: p_ep.verify(ifaces))

    def prepare(self):
        """Prepare all plugins in the registry."""
        return [plugin_ep.prepare() for plugin_ep in six.itervalues(self._plugins)]

    def available(self):
        """Filter plugins based on availability."""
        return self.filter(lambda p_ep: p_ep.available)
        # successfully prepared + misconfigured

    def find_init(self, plugin):
        """Find an initialized plugin.

        This is particularly useful for finding a name for the plugin
        (although `.IPluginFactory.__call__` takes ``name`` as one of
        the arguments, ``IPlugin.name`` is not part of the interface)::

          # plugin is an instance providing IPlugin, initialized
          # somewhere else in the code
          plugin_registry.find_init(plugin).name

        Returns ``None`` if ``plugin`` is not found in the registry.

        """
        # use list instead of set because PluginEntryPoint is not hashable
        candidates = [plugin_ep for plugin_ep in six.itervalues(self._plugins)
                      if plugin_ep.initialized and plugin_ep.init() is plugin]
        assert len(candidates) <= 1
        if candidates:
            return candidates[0]
        else:
            return None

    def __repr__(self):
        return "{0}({1})".format(
            self.__class__.__name__, ','.join(
                repr(p_ep) for p_ep in six.itervalues(self._plugins)))

    def __str__(self):
        if not self._plugins:
            return "No plugins"
        return "\n\n".join(str(p_ep) for p_ep in six.itervalues(self._plugins))