/usr/share/pyshared/lazr/restful/_bytestorage.py is in python-lazr.restful 0.19.3-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 | # Copyright 2008-2009 Canonical Ltd. All rights reserved.
#
# This file is part of lazr.restful
#
# lazr.restful is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# lazr.restful is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with lazr.restful. If not, see <http://www.gnu.org/licenses/>.
"""Classes for a resource that implements a binary file repository."""
__metaclass__ = type
__all__ = [
'ByteStorageResource',
]
from zope.interface import implements
from zope.publisher.interfaces import NotFound
from zope.schema import ValidationError
from lazr.restful.interfaces import IByteStorageResource
from lazr.restful._resource import HTTPResource
class ByteStorageResource(HTTPResource):
"""A resource providing read-write access to a stored byte stream."""
implements(IByteStorageResource)
def __call__(self):
"""Handle a GET, PUT, or DELETE request."""
if self.request.method == "GET":
return self.do_GET()
else:
if self.context.field.readonly:
# Read-only resources only support GET.
allow_string = "GET"
elif self.request.method == "PUT":
type = self.request.headers['Content-Type']
disposition, params = self._parseContentDispositionHeader(
self.request.headers['Content-Disposition'])
cache_stream = self.request.bodyStream.getCacheStream()
representation = cache_stream.read()
return self.do_PUT(type, representation,
params.get('filename'))
elif self.request.method == "DELETE":
return self.do_DELETE()
else:
allow_string = "GET PUT DELETE"
# The client tried to invoke an unsupported HTTP method.
self.request.response.setStatus(405)
self.request.response.setHeader("Allow", allow_string)
def do_GET(self):
"""See `IByteStorageResource`."""
media_type = self.getPreferredSupportedContentType()
if media_type in [self.WADL_TYPE, self.DEPRECATED_WADL_TYPE]:
result = self.toWADL().encode("utf-8")
self.request.response.setHeader('Content-Type', media_type)
return result
if not self.context.is_stored:
# No stored document exists here yet.
raise NotFound(self.context, self.context.filename, self.request)
self.request.response.setStatus(303) # See Other
self.request.response.setHeader('Location', self.context.alias_url)
def do_PUT(self, type, representation, filename):
"""See `IByteStorageResource`."""
try:
self.context.field.validate(representation)
except ValidationError, e:
self.request.response.setStatus(400) # Bad Request
return str(e)
self.context.createStored(type, representation, filename)
def do_DELETE(self):
"""See `IByteStorageResource`."""
self.context.deleteStored()
|