/usr/lib/python3/dist-packages/axes/attempts.py is in python3-django-axes 4.1.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | from datetime import timedelta
from hashlib import md5
from django.contrib.auth import get_user_model
from django.utils import timezone
from ipware.ip import get_ip
from axes.conf import settings
from axes.models import AccessAttempt
from axes.utils import get_axes_cache
def _query_user_attempts(request):
"""Returns access attempt record if it exists.
Otherwise return None.
"""
ip = get_ip(request)
username = request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None)
if settings.AXES_ONLY_USER_FAILURES:
attempts = AccessAttempt.objects.filter(username=username)
elif settings.AXES_USE_USER_AGENT:
ua = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
attempts = AccessAttempt.objects.filter(
user_agent=ua, ip_address=ip, username=username, trusted=True
)
else:
attempts = AccessAttempt.objects.filter(
ip_address=ip, username=username, trusted=True
)
if not attempts:
params = {'trusted': False}
if settings.AXES_ONLY_USER_FAILURES:
params['username'] = username
elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
params['username'] = username
params['ip_address'] = ip
else:
params['ip_address'] = ip
if settings.AXES_USE_USER_AGENT:
params['user_agent'] = ua
attempts = AccessAttempt.objects.filter(**params)
return attempts
def get_cache_key(request_or_obj):
"""
Build cache key name from request or AccessAttempt object.
:param request_or_obj: Request or AccessAttempt object
:return cache-key: String, key to be used in cache system
"""
if isinstance(request_or_obj, AccessAttempt):
ip = request_or_obj.ip_address
un = request_or_obj.username
ua = request_or_obj.user_agent
else:
ip = get_ip(request_or_obj)
un = request_or_obj.POST.get(settings.AXES_USERNAME_FORM_FIELD, None)
ua = request_or_obj.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
ip = ip.encode('utf-8') if ip else ''.encode('utf-8')
un = un.encode('utf-8') if un else ''.encode('utf-8')
ua = ua.encode('utf-8') if ua else ''.encode('utf-8')
if settings.AXES_ONLY_USER_FAILURES:
attributes = un
elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
attributes = ip + un
else:
attributes = ip
if settings.AXES_USE_USER_AGENT:
attributes += ua
cache_hash_key = 'axes-{}'.format(md5(attributes).hexdigest())
return cache_hash_key
def get_cache_timeout():
"""Returns timeout according to COOLOFF_TIME."""
cache_timeout = None
cool_off = settings.AXES_COOLOFF_TIME
if cool_off:
if (isinstance(cool_off, int) or isinstance(cool_off, float)):
cache_timeout = timedelta(hours=cool_off).total_seconds()
else:
cache_timeout = cool_off.total_seconds()
return cache_timeout
def get_user_attempts(request):
force_reload = False
attempts = _query_user_attempts(request)
cache_hash_key = get_cache_key(request)
cache_timeout = get_cache_timeout()
cool_off = settings.AXES_COOLOFF_TIME
if cool_off:
if (isinstance(cool_off, int) or isinstance(cool_off, float)):
cool_off = timedelta(hours=cool_off)
for attempt in attempts:
if attempt.attempt_time + cool_off < timezone.now():
if attempt.trusted:
attempt.failures_since_start = 0
attempt.save()
get_axes_cache().set(cache_hash_key, 0, cache_timeout)
else:
attempt.delete()
force_reload = True
failures_cached = get_axes_cache().get(cache_hash_key)
if failures_cached is not None:
get_axes_cache().set(
cache_hash_key, failures_cached - 1, cache_timeout
)
# If objects were deleted, we need to update the queryset to reflect this,
# so force a reload.
if force_reload:
attempts = _query_user_attempts(request)
return attempts
def ip_in_whitelist(ip):
if not settings.AXES_IP_WHITELIST:
return False
return ip in settings.AXES_IP_WHITELIST
def ip_in_blacklist(ip):
if not settings.AXES_IP_BLACKLIST:
return False
return ip in settings.AXES_IP_BLACKLIST
def is_user_lockable(request):
"""Check if the user has a profile with nolockout
If so, then return the value to see if this user is special
and doesn't get their account locked out
"""
if hasattr(request.user, 'nolockout'):
return not request.user.nolockout
if request.method != 'POST':
return True
try:
field = getattr(get_user_model(), 'USERNAME_FIELD', 'username')
kwargs = {
field: request.POST.get(settings.AXES_USERNAME_FORM_FIELD)
}
user = get_user_model().objects.get(**kwargs)
if hasattr(user, 'nolockout'):
# need to invert since we need to return
# false for users that can't be blocked
return not user.nolockout
except get_user_model().DoesNotExist:
# not a valid user
return True
# Default behavior for a user to be lockable
return True
def is_already_locked(request):
ip = get_ip(request)
if (
settings.AXES_ONLY_USER_FAILURES or
settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP
) and request.method == 'GET':
return False
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip):
return False
if settings.AXES_ONLY_WHITELIST and not ip_in_whitelist(ip):
return True
if ip_in_blacklist(ip):
return True
if not is_user_lockable(request):
return False
cache_hash_key = get_cache_key(request)
failures_cached = get_axes_cache().get(cache_hash_key)
if failures_cached is not None:
return (
failures_cached >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE
)
else:
for attempt in get_user_attempts(request):
if (
attempt.failures_since_start >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE
):
return True
return False
|