/usr/lib/python3/dist-packages/pydap/cas/get_cookies.py is in python3-pydap 3.2.2+ds1-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | from bs4 import BeautifulSoup
from six.moves.urllib.parse import urlsplit, urlunsplit
import warnings
import requests
from requests.packages.urllib3.exceptions import (InsecureRequestWarning,
InsecurePlatformWarning)
import copy
from .. import lib
ssl_verify_categories = [InsecureRequestWarning,
InsecurePlatformWarning]
def setup_session(uri,
username=None,
password=None,
check_url=None,
session=None,
verify=True,
username_field='username',
password_field='password'):
'''
A general function to set-up requests session with cookies
using beautifulsoup and by calling the right url.
'''
if session is None:
# Connections must be closed since some CAS
# will cough when connections are kept alive:
headers = [('User-agent', 'Pydap/{}'.format(lib.__version__)),
('Connection', 'close')]
session = requests.Session()
session.headers.update(headers)
if uri is None:
return session
if not verify:
verify_flag = session.verify
session.verify = False
if isinstance(uri, str):
url = uri
else:
url = uri(check_url)
if password is None or password == '':
warnings.warn('password was not set. '
'this was likely unintentional '
'but will result is much fewer datasets.')
if not verify:
session.verify = verify_flag
return session
# Allow for several subsequent security layers:
full_url = copy.copy(url)
if isinstance(full_url, list):
url = full_url[0]
with warnings.catch_warnings():
if not verify:
# Catch warnings. It is assumed that the
# user that explicitly uses verify=False
# is either fully aware of the risks
# or cannot avoid the risks because of
# an improperly configured server.
# This error will usually occur with
# ESGF authentication.
for category in ssl_verify_categories:
warnings.filterwarnings("ignore",
category=category)
response = soup_login(session, url, username, password,
username_field=username_field,
password_field=password_field)
# If there are further security levels.
# At the moment only used for CEDA OPENID:
if (isinstance(full_url, list) and
len(full_url) > 1):
for url in full_url[1:]:
response = soup_login(session, response.url,
username, password,
username_field=None,
password_field=None)
response.close()
if check_url:
if (username is not None and
password is not None):
res = session.get(check_url, auth=(username, password))
if res.status_code == 401:
res = session.get(res.url, auth=(username, password))
res.close()
raise_if_form_exists(check_url, session)
if not verify:
session.verify = verify_flag
return session
def raise_if_form_exists(url, session):
"""
This function raises a UserWarning if the link has forms
"""
user_warning = ('Navigate to {0}, '.format(url) +
'login and follow instructions. '
'It is likely that you have to perform some one-time '
'registration steps before acessing this data.')
resp = session.get(url)
soup = BeautifulSoup(resp.content, 'lxml')
if len(soup.select('form')) > 0:
raise UserWarning(user_warning)
def soup_login(session, url, username, password,
username_field='username',
password_field='password'):
resp = session.get(url)
soup = BeautifulSoup(resp.content, 'lxml')
login_form = soup.select('form')[0]
def get_to_url(current_url, to_url):
split_current = urlsplit(current_url)
split_to = urlsplit(to_url)
comb = [val2 if val1 == '' else val1
for val1, val2 in zip(split_to, split_current)]
return urlunsplit(comb)
to_url = get_to_url(resp.url, login_form.get('action'))
session.headers['Referer'] = resp.url
payload = {}
if username_field is not None:
if len(login_form.findAll('input', {'name': username_field})) > 0:
payload.update({username_field: username})
if password_field is not None:
if len(login_form.findAll('input', {'name': password_field})) > 0:
payload.update({password_field: password})
else:
# If there is no password_field, it might be because
# something should be handled in the browser
# for the first attempt. This is common when using
# pydap with the ESGF for the first time.
raise Exception('Navigate to {0}. '
'If you are unable to '
'login, you must either '
'wait or use authentication '
'from another service.'
.format(url))
# Replicate all other fields:
for input in login_form.findAll('input'):
if (input.get('name') not in payload and
input.get('name') is not None):
payload.update({input.get('name'): input.get('value')})
# Remove other submit fields:
submit_type = 'submit'
submit_names = [input.get('name') for input
in login_form.findAll('input', {'type': submit_type})]
for input in login_form.findAll('input', {'type': submit_type}):
if ('submit' in submit_names and
input.get('name').lower() != 'submit'):
payload.pop(input.get('name'), None)
return session.post(to_url, data=payload)
|