/usr/share/pyshared/quodlibet/util/modulescanner.py is in exfalso 3.0.2-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | # -*- coding: utf-8 -*-
# Copyright 2012 Christoph Reiter
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation
import os
import sys
import imp
from os.path import join, splitext, dirname, basename
from traceback import format_exception
from quodlibet import util
def load_dir_modules(path, package, load_compiled=False):
"""Load all modules in path (non-recursive).
Load pyc files if load_compiled is True.
In case the module is already loaded, doesn't reload it.
"""
try:
entries = os.listdir(path)
except OSError:
print_w("%r not found" % path)
return []
modules = set()
for entry in entries:
if entry[:1] == "_":
continue
if entry.endswith(".py"):
modules.add(entry[:-3])
elif load_compiled and entry.endswith(".pyc"):
modules.add(entry[:-4])
loaded = []
for name in modules:
try:
mod = load_module(name, package, path)
except Exception:
util.print_exc()
continue
if mod:
loaded.append(mod)
return loaded
def get_importables(folder):
"""Searches a folder and its subfolders for modules and packages to import.
No subfolders in packages, .pyc, .so supported.
returns a tuple of the import path and a list of possible dependencies
"""
is_ok = lambda f: f.endswith(".py") and not f.startswith("_")
for root, dirs, names in os.walk(folder):
if "__init__.py" in names:
yield (root, filter(is_ok, [join(root, name) for name in names]))
else:
for name in filter(is_ok, names):
yield (join(root, name), [join(root, name)])
def load_module(name, package, path, reload=False):
"""Load a module/package. Returns the module or None.
Doesn't catch any exceptions during the actual import.
If reload is True and the module is already loaded, reload it.
"""
fullname = package + "." + name
try:
return sys.modules[fullname]
except KeyError:
pass
try:
# this also handles packages
fp, path, desc = imp.find_module(name, [path])
except ImportError:
return
# modules need a parent package
if package not in sys.modules:
sys.modules[package] = imp.new_module(package)
try:
mod = imp.load_module(fullname, fp, path, desc)
finally:
if fp:
fp.close()
# make it accessible from the parent, like __import__ does
vars(sys.modules[package])[name] = mod
return mod
class ModuleScanner(object):
"""
Handles plugin modules. Takes a list of directories and searches
for loadable python modules/packages in all of them.
There is only one global namespace for modules using the module name
as key.
rescan() - Update the module list. Returns added/removed modules
failures - A dict of Name: (Exception, Text) for all modules that failed
modules - A dict of Name: Module for all successfully loaded modules
"""
def __init__(self, folders):
self.__folders = folders
self.__modules = {} # name: module
self.__info = {} # name: (path, deps)
self.__deps = {} # dep: mtime of last check
self.__failures = {} # name: exception
def __remove_module(self, name):
del self.__modules[name]
path, deps = self.__info[name]
for dep in deps:
del self.__deps[dep]
del self.__info[name]
if name in self.__failures:
del self.__failures[name]
@property
def failures(self):
"""A name: exception dict for all modules that failed to load"""
return self.__failures
@property
def modules(self):
"""A name: module dict of all loaded modules"""
return self.__modules
def rescan(self):
"""Rescan all folders for changed/new/removed modules.
The caller should release all references to removed modules.
Returns a tuple: (removed, added)
"""
print_d("Rescanning..")
info = {}
# get what is there atm
for folder in self.__folders:
for path, deps in get_importables(folder):
# take the basename as module key, later modules win
info[splitext(basename(path))[0]] = (path, deps)
def deps_changed(old_list, new_list):
if set(old_list) != set(new_list):
return True
for dep in old_list:
old_mtime = self.__deps[dep]
if util.mtime(dep) != old_mtime:
return True
return False
# python can not unload a module, so we can only add new ones
# or reload if the path is the same and mtime changed,
# but we can still pretend we removed something
removed = []
added = []
# remove those that are gone and changed ones
for name, mod in self.__modules.items():
if name not in info:
self.__remove_module(name)
removed.append(name)
continue
path, deps = self.__info[name]
path, new_deps = info[name]
if deps_changed(deps, new_deps):
self.__remove_module(name)
removed.append(name)
# add new ones
for (name, (path, deps)) in info.iteritems():
if name in self.__modules:
continue
try:
# add a real module, so that pickle works
# http://code.google.com/p/quodlibet/issues/detail?id=1093
parent = "quodlibet.fake"
if parent not in sys.modules:
sys.modules[parent] = imp.new_module(parent)
vars(sys.modules["quodlibet"])["fake"] = sys.modules[parent]
mod = load_module(name, parent + ".plugins",
dirname(path), reload=True)
if mod is None:
continue
except Exception, err:
text = format_exception(*sys.exc_info())
self.__failures[name] = (err, text)
else:
added.append(name)
self.__modules[name] = mod
self.__info[name] = info[name]
for dep in deps:
self.__deps[dep] = util.mtime(dep)
print_d("Rescanning done: %d added, %d removed, %d error(s)" %
(len(added), len(removed), len(self.__failures)))
return removed, added
|