/usr/share/pyshared/zope/app/publication/requestpublicationregistry.py is in python-zope.app.publication 3.13.2-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | ##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""A registry for Request-Publication factories.
"""
__docformat__ = 'restructuredtext'
from zope.interface import implements
from zope.app.publication.interfaces import IRequestPublicationRegistry
from zope.configuration.exceptions import ConfigurationError
class RequestPublicationRegistry(object):
"""The registry implements a three stage lookup for registered factories
that have to deal with requests::
{method > { mimetype -> [{'priority' : some_int,
'factory' : factory,
'name' : some_name }, ...
]
},
}
The `priority` is used to define a lookup-order when multiple factories
are registered for the same method and mime-type.
"""
implements(IRequestPublicationRegistry)
def __init__(self):
self._d = {} # method -> { mimetype -> {factories_data}}
def register(self, method, mimetype, name, priority, factory):
"""Register a factory for method+mimetype """
# initialize the two-level deep nested datastructure if necessary
if not self._d.has_key(method):
self._d[method] = {}
if not self._d[method].has_key(mimetype):
self._d[method][mimetype] = []
l = self._d[method][mimetype]
# Check if there is already a registered publisher factory (check by
# name). If yes then it will be removed and replaced by a new
# publisher.
for pos, d in enumerate(l):
if d['name'] == name:
del l[pos]
break
# add the publisher factory + additional informations
l.append({'name' : name, 'factory' : factory, 'priority' : priority})
# order by descending priority
l.sort(lambda x,y: -cmp(x['priority'], y['priority']))
# check if the priorities are unique
priorities = [item['priority'] for item in l]
if len(set(priorities)) != len(l):
raise ConfigurationError('All registered publishers for a given '
'method+mimetype must have distinct '
'priorities. Please check your ZCML '
'configuration')
def getFactoriesFor(self, method, mimetype):
if ';' in mimetype:
# `mimetype` might be something like 'text/xml; charset=utf8'. In
# this case we are only interested in the first part.
mimetype = mimetype.split(';')[0]
try:
return self._d[method][mimetype.strip()]
except KeyError:
return None
def lookup(self, method, mimetype, environment):
"""Lookup a factory for a given method+mimetype and a environment."""
for m,mt in ((method, mimetype), (method, '*'), ('*', '*')):
factory_lst = self.getFactoriesFor(m, mt)
if factory_lst:
break
else:
raise ConfigurationError('No registered publisher found '
'for (%s/%s)' % (method, mimetype))
# now iterate over all factory candidates and let them introspect
# the request environment to figure out if they can handle the
# request
for d in factory_lst:
factory = d['factory']
if factory.canHandle(environment):
return factory
# Actually we should never get here unless of improper
# configuration (no default handler for method=* and mimetype=*)
return None
factoryRegistry = RequestPublicationRegistry()
try:
import zope.testing.cleanup
except ImportError:
pass
else:
zope.testing.cleanup.addCleanUp(lambda : factoryRegistry.__init__())
|