/usr/share/pyshared/zope/catalog/catalog.py is in python-zope.catalog 3.8.2-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | ##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Catalog
"""
__docformat__ = 'restructuredtext'
import BTrees
import zope.index.interfaces
from zope import component
from zope.annotation.interfaces import IAttributeAnnotatable
from zope.container.btree import BTreeContainer
from zope.container.interfaces import IObjectAddedEvent
from zope.interface import implements
from zope.intid.interfaces import IIntIds, IIntIdAddedEvent, IIntIdRemovedEvent
from zope.lifecycleevent import IObjectModifiedEvent
from zope.location import location
from zope.location.interfaces import ILocationInfo
from zope.catalog.interfaces import ICatalog, INoAutoIndex, INoAutoReindex, ICatalogIndex
class ResultSet:
"""Lazily accessed set of objects."""
def __init__(self, uids, uidutil):
self.uids = uids
self.uidutil = uidutil
def __len__(self):
return len(self.uids)
def __iter__(self):
for uid in self.uids:
obj = self.uidutil.getObject(uid)
yield obj
class Catalog(BTreeContainer):
implements(ICatalog,
IAttributeAnnotatable,
zope.index.interfaces.IIndexSearch,
)
family = BTrees.family32
def __init__(self, family=None):
super(Catalog, self).__init__()
if family is not None:
self.family = family
def clear(self):
for index in self.values():
index.clear()
def index_doc(self, docid, texts):
"""Register the data in indexes of this catalog."""
for index in self.values():
index.index_doc(docid, texts)
def unindex_doc(self, docid):
"""Unregister the data from indexes of this catalog."""
for index in self.values():
index.unindex_doc(docid)
def _visitSublocations(self) :
"""Restricts the access to the objects that live within
the nearest site if the catalog itself is locatable.
"""
uidutil = None
locatable = ILocationInfo(self, None)
if locatable is not None :
site = locatable.getNearestSite()
sm = site.getSiteManager()
uidutil = sm.queryUtility(IIntIds)
if uidutil not in [c.component for c in sm.registeredUtilities()]:
# we do not have a local inits utility
uidutil = component.getUtility(IIntIds, context=self)
for uid in uidutil:
obj = uidutil.getObject(uid)
if location.inside(obj, site) :
yield uid, obj
return
if uidutil is None:
uidutil = component.getUtility(IIntIds)
for uid in uidutil:
yield uid, uidutil.getObject(uid)
def updateIndex(self, index):
for uid, obj in self._visitSublocations() :
index.index_doc(uid, obj)
def updateIndexes(self):
for uid, obj in self._visitSublocations() :
for index in self.values():
index.index_doc(uid, obj)
def apply(self, query):
results = []
for index_name, index_query in query.items():
index = self[index_name]
r = index.apply(index_query)
if r is None:
continue
if not r:
# empty results
return r
results.append((len(r), r))
if not results:
# no applicable indexes, so catalog was not applicable
return None
results.sort() # order from smallest to largest
_, result = results.pop(0)
for _, r in results:
_, result = self.family.IF.weightedIntersection(result, r)
return result
def searchResults(self, **searchterms):
sort_index = searchterms.pop('_sort_index', None)
limit = searchterms.pop('_limit', None)
reverse = searchterms.pop('_reverse', False)
results = self.apply(searchterms)
if results is not None:
if sort_index is not None:
index = self[sort_index]
if not zope.index.interfaces.IIndexSort.providedBy(index):
raise ValueError('Index %s does not support sorting.' % sort_index)
results = list(index.sort(results, limit=limit, reverse=reverse))
else:
if reverse or limit:
results = list(results)
if reverse:
results.reverse()
if limit:
del results[limit:]
uidutil = component.getUtility(IIntIds)
results = ResultSet(results, uidutil)
return results
@component.adapter(ICatalogIndex, IObjectAddedEvent)
def indexAdded(index, event):
"""When an index is added to a catalog, we have to index existing objects
When an index is added, we tell it's parent to index it:
>>> class FauxCatalog:
... def updateIndex(self, index):
... self.updated = index
>>> class FauxIndex:
... pass
>>> index = FauxIndex()
>>> index.__parent__ = FauxCatalog()
>>> indexAdded(index, None)
>>> index.__parent__.updated is index
True
"""
index.__parent__.updateIndex(index)
@component.adapter(IIntIdAddedEvent)
def indexDocSubscriber(event):
"""A subscriber to IntIdAddedEvent"""
ob = event.object
if INoAutoIndex.providedBy(ob):
return
for cat in component.getAllUtilitiesRegisteredFor(ICatalog, context=ob):
id = component.getUtility(IIntIds, context=cat).getId(ob)
cat.index_doc(id, ob)
@component.adapter(IObjectModifiedEvent)
def reindexDocSubscriber(event):
"""A subscriber to ObjectModifiedEvent"""
ob = event.object
if INoAutoReindex.providedBy(ob):
return
for cat in component.getAllUtilitiesRegisteredFor(ICatalog, context=ob):
id = component.getUtility(IIntIds, context=cat).queryId(ob)
if id is not None:
cat.index_doc(id, ob)
@component.adapter(IIntIdRemovedEvent)
def unindexDocSubscriber(event):
"""A subscriber to IntIdRemovedEvent"""
ob = event.object
for cat in component.getAllUtilitiesRegisteredFor(ICatalog, context=ob):
id = component.getUtility(IIntIds, context=cat).queryId(ob)
if id is not None:
cat.unindex_doc(id)
|