/usr/share/webcheck/schemes/http.py is in webcheck 1.10.4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | # http.py - handle urls with a http scheme
#
# Copyright (C) 1998, 1999 Albert Hopkins (marduk)
# Copyright (C) 2002 Mike W. Meyer
# Copyright (C) 2005, 2006, 2007, 2008, 2010 Arthur de Jong
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
# The files produced as output from the software do not automatically fall
# under the copyright of the software, unless explicitly stated otherwise.
"""This module defines the functions needed for filling in information in Link
objects for urls using the http scheme."""
import config
import debugio
import httplib
import urllib
import time
import urlparse
import base64
import socket
import re
# pattern for extracting character set information from content-type header
_charsetpattern = re.compile('charset=([^ ]*)', re.I)
# set socket timeout to configured value
socket.setdefaulttimeout(config.IOTIMEOUT)
def fetch(link, acceptedtypes):
"""Open connection to url and report information given by GET command."""
# TODO: HTTP connection pooling?
# TODO: implement proxy requests for https
# split netloc in user:pass part and host:port part
(userpass, netloc) = urllib.splituser(link.netloc)
# if the URL did not contain userpass and the netloc is configured
# get the userpass from that
if not userpass and netloc in config.USERPASS:
userpass=config.USERPASS[netloc]
debugio.debug('schemes.http.fetch(): using userpass=%s' % userpass)
proxyuserpass = None
scheme = link.scheme
# check validity of netloc (to work around bug in idna module)
if netloc[0] == '.':
debugio.debug('schemes.http.fetch(): fail on hostname starting with dot')
link.add_linkproblem('hostname starts with a dot')
return None
# check which host to connect to (if using proxies)
if config.PROXIES and config.PROXIES.has_key(link.scheme):
# pass the complete url in the request, connecting to the proxy
path = urlparse.urlunsplit((link.scheme, netloc, link.path, link.query, ''))
(scheme, netloc) = urlparse.urlsplit(config.PROXIES[link.scheme])[0:2]
(proxyuserpass, netloc) = urllib.splituser(netloc)
else:
# otherwise direct connect to the server with partial url
path = urlparse.urlunsplit(('', '', link.path, link.query, ''))
# remove trailing : from netloc
if netloc[-1] == ':':
netloc = netloc[:-1]
conn = None
try:
try:
# create the connection
debugio.debug('schemes.http.fetch: connecting to %s' % netloc)
if scheme == 'http':
conn = httplib.HTTPConnection(netloc)
elif scheme == 'https':
conn = httplib.HTTPSConnection(netloc)
# start the request
conn.putrequest('GET', path, skip_host=True)
conn.putheader('Host', urllib.splitport(netloc)[0])
if len(link.parents) > 0:
conn.putheader('Referer', list(link.parents)[0].url)
if userpass is not None:
(user, passwd) = urllib.splitpasswd(userpass)
conn.putheader(
'Authorization',
'Basic '+base64.encodestring(str(user)+':'+str(passwd)).strip() )
if proxyuserpass is not None:
(user, passwd) = urllib.splitpasswd(proxyuserpass)
conn.putheader(
'Proxy-Authorization',
'Basic '+base64.encodestring(str(user)+':'+str(passwd)).strip() )
# bypass proxy cache
if config.BYPASSHTTPCACHE:
conn.putheader('Cache-control', 'no-cache')
conn.putheader('Pragma', 'no-cache')
conn.putheader('User-Agent','webcheck %s' % config.VERSION)
conn.endheaders()
# wait for the response
response = conn.getresponse()
link.status = '%s %s' % (response.status, response.reason)
debugio.debug('schemes.http.fetch(): HTTP response: %s' % link.status)
# dump proxy hit/miss debugging info
if config.PROXIES and config.PROXIES.has_key(link.scheme):
try:
debugio.debug('schemes.http.fetch(): X-Cache: %s' % str(response.getheader('X-Cache')))
except AttributeError:
pass
# retrieve some information from the headers
try:
link.mimetype = response.msg.gettype()
debugio.debug('schemes.http.fetch(): mimetype: %s' % str(link.mimetype))
except AttributeError:
pass
try:
link.set_encoding(_charsetpattern.search(response.getheader('Content-type')).group(1))
except (AttributeError, TypeError):
pass
try:
link.size = int(response.getheader('Content-length'))
debugio.debug('schemes.http.fetch(): size: %s' % str(link.size))
except (KeyError, TypeError):
pass
try:
link.mtime = time.mktime(response.msg.getdate('Last-Modified'))
debugio.debug('schemes.http.fetch(): mtime: %s' % time.strftime('%c', time.localtime(link.mtime)))
except (OverflowError, TypeError, ValueError):
pass
# handle redirects
# 301=moved permanently, 302=found, 303=see other, 307=temporary redirect
if response.status in (301, 302, 303, 307):
# consider a 301 (moved permanently) a problem
if response.status == 301:
link.add_linkproblem(str(response.status)+': '+response.reason)
# find url that is redirected to
location = urlparse.urljoin(link.url, response.getheader('Location', ''))
# create the redirect
link.redirect(location)
return None
elif response.status != 200:
# handle error responses
link.add_linkproblem(str(response.status)+': '+response.reason)
return None
elif link.mimetype in acceptedtypes:
# return succesful responses
# TODO: support gzipped content
# TODO: add checking for size
return response.read()
except httplib.HTTPException, e:
debugio.debug('error reading HTTP response: '+str(e))
link.add_linkproblem('error reading HTTP response: '+str(e))
return None
except socket.error, e:
if hasattr(e, 'args') and len(e.args) == 2:
debugio.debug("error reading HTTP response: "+str(e.args[1]))
link.add_linkproblem("error reading HTTP response: "+str(e.args[1]))
else:
debugio.debug("error reading HTTP response: "+str(e))
link.add_linkproblem("error reading HTTP response: "+str(e))
return None
except KeyboardInterrupt:
# handle this in a higher-level exception handler
raise
except Exception, e:
# handle all other exceptions
debugio.warn('unknown exception caught: '+str(e))
link.add_linkproblem('error reading HTTP response: '+str(e))
import traceback
traceback.print_exc()
return None
finally:
# close the connection before returning
if conn is not None:
conn.close()
|