/usr/lib/python3/dist-packages/axes/signals.py is in python3-django-axes 4.1.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | import logging
from django.contrib.auth.signals import user_logged_in
from django.contrib.auth.signals import user_logged_out
from django.contrib.auth.signals import user_login_failed
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.dispatch import Signal
from django.utils import timezone
from ipware.ip import get_ip
from axes.conf import settings
from axes.attempts import get_cache_key
from axes.attempts import get_cache_timeout
from axes.attempts import get_user_attempts
from axes.attempts import is_user_lockable
from axes.attempts import ip_in_whitelist
from axes.models import AccessLog, AccessAttempt
from axes.utils import get_client_str
from axes.utils import query2str
from axes.utils import get_axes_cache
log = logging.getLogger(settings.AXES_LOGGER)
user_locked_out = Signal(providing_args=['request', 'username', 'ip_address'])
@receiver(user_login_failed)
def log_user_login_failed(sender, credentials, request, **kwargs):
""" Create an AccessAttempt record if the login wasn't successful
"""
if request is None or settings.AXES_USERNAME_FORM_FIELD not in credentials:
log.error('Attempt to authenticate with a custom backend failed.')
return
ip_address = get_ip(request)
username = credentials[settings.AXES_USERNAME_FORM_FIELD]
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address):
return
failures = 0
attempts = get_user_attempts(request)
cache_hash_key = get_cache_key(request)
cache_timeout = get_cache_timeout()
failures_cached = get_axes_cache().get(cache_hash_key)
if failures_cached is not None:
failures = failures_cached
else:
for attempt in attempts:
failures = max(failures, attempt.failures_since_start)
# add a failed attempt for this user
failures += 1
get_axes_cache().set(cache_hash_key, failures, cache_timeout)
# has already attempted, update the info
if len(attempts):
for attempt in attempts:
attempt.get_data = '%s\n---------\n%s' % (
attempt.get_data,
query2str(request.GET),
)
attempt.post_data = '%s\n---------\n%s' % (
attempt.post_data,
query2str(request.POST)
)
attempt.http_accept = http_accept
attempt.path_info = path_info
attempt.failures_since_start = failures
attempt.attempt_time = timezone.now()
attempt.save()
fail_msg = 'AXES: Repeated login failure by {0}.'.format(
get_client_str(username, ip_address, user_agent, path_info)
)
count_msg = 'Count = {0} of {1}'.format(
failures, settings.AXES_FAILURE_LIMIT
)
log.info('{0} {1}'.format(fail_msg, count_msg))
else:
# Record failed attempt. Whether or not the IP address or user agent is
# used in counting failures is handled elsewhere, so we just record
# everything here.
AccessAttempt.objects.create(
user_agent=user_agent,
ip_address=ip_address,
username=username,
get_data=query2str(request.GET),
post_data=query2str(request.POST),
http_accept=http_accept,
path_info=path_info,
failures_since_start=failures,
)
log.info(
'AXES: New login failure by {0}. Creating access record.'.format(
get_client_str(username, ip_address, user_agent, path_info)
)
)
# no matter what, we want to lock them out if they're past the number of
# attempts allowed, unless the user is set to notlockable
if (
failures >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE and
is_user_lockable(request)
):
log.warning('AXES: locked out {0} after repeated login attempts.'.format(
get_client_str(username, ip_address, user_agent, path_info)
))
# send signal when someone is locked out.
user_locked_out.send(
'axes', request=request, username=username, ip_address=ip_address
)
@receiver(user_logged_in)
def log_user_logged_in(sender, request, user, **kwargs):
""" When a user logs in, update the access log
"""
username = user.get_username()
ip_address = get_ip(request)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
log.info('AXES: Successful login by {0}.'.format(
get_client_str(username, ip_address, user_agent, path_info)
))
if not settings.AXES_DISABLE_SUCCESS_ACCESS_LOG:
AccessLog.objects.create(
user_agent=user_agent,
ip_address=ip_address,
username=username,
http_accept=http_accept,
path_info=path_info,
trusted=True,
)
@receiver(user_logged_out)
def log_user_logged_out(sender, request, user, **kwargs):
""" When a user logs out, update the access log
"""
log.info('AXES: Successful logout by {0}.'.format(user))
if user and not settings.AXES_DISABLE_ACCESS_LOG:
AccessLog.objects.filter(
username=user.get_username(),
logout_time__isnull=True,
).update(logout_time=timezone.now())
@receiver(post_save, sender=AccessAttempt)
def update_cache_after_save(instance, **kwargs):
cache_hash_key = get_cache_key(instance)
if not get_axes_cache().get(cache_hash_key):
cache_timeout = get_cache_timeout()
get_axes_cache().set(cache_hash_key, instance.failures_since_start, cache_timeout)
@receiver(post_delete, sender=AccessAttempt)
def delete_cache_after_delete(instance, **kwargs):
cache_hash_key = get_cache_key(instance)
get_axes_cache().delete(cache_hash_key)
|