This file is indexed.

/usr/lib/python3/dist-packages/axes/signals.py is in python3-django-axes 4.1.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import logging

from django.contrib.auth.signals import user_logged_in
from django.contrib.auth.signals import user_logged_out
from django.contrib.auth.signals import user_login_failed
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.dispatch import Signal
from django.utils import timezone

from ipware.ip import get_ip

from axes.conf import settings
from axes.attempts import get_cache_key
from axes.attempts import get_cache_timeout
from axes.attempts import get_user_attempts
from axes.attempts import is_user_lockable
from axes.attempts import ip_in_whitelist
from axes.models import AccessLog, AccessAttempt
from axes.utils import get_client_str
from axes.utils import query2str
from axes.utils import get_axes_cache


log = logging.getLogger(settings.AXES_LOGGER)


user_locked_out = Signal(providing_args=['request', 'username', 'ip_address'])


@receiver(user_login_failed)
def log_user_login_failed(sender, credentials, request, **kwargs):
    """ Create an AccessAttempt record if the login wasn't successful
    """
    if request is None or settings.AXES_USERNAME_FORM_FIELD not in credentials:
        log.error('Attempt to authenticate with a custom backend failed.')
        return

    ip_address = get_ip(request)
    username = credentials[settings.AXES_USERNAME_FORM_FIELD]
    user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
    path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
    http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]

    if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address):
        return

    failures = 0
    attempts = get_user_attempts(request)
    cache_hash_key = get_cache_key(request)
    cache_timeout = get_cache_timeout()

    failures_cached = get_axes_cache().get(cache_hash_key)
    if failures_cached is not None:
        failures = failures_cached
    else:
        for attempt in attempts:
            failures = max(failures, attempt.failures_since_start)

    # add a failed attempt for this user
    failures += 1
    get_axes_cache().set(cache_hash_key, failures, cache_timeout)

    # has already attempted, update the info
    if len(attempts):
        for attempt in attempts:
            attempt.get_data = '%s\n---------\n%s' % (
                attempt.get_data,
                query2str(request.GET),
            )
            attempt.post_data = '%s\n---------\n%s' % (
                attempt.post_data,
                query2str(request.POST)
            )
            attempt.http_accept = http_accept
            attempt.path_info = path_info
            attempt.failures_since_start = failures
            attempt.attempt_time = timezone.now()
            attempt.save()

            fail_msg = 'AXES: Repeated login failure by {0}.'.format(
                get_client_str(username, ip_address, user_agent, path_info)
            )
            count_msg = 'Count = {0} of {1}'.format(
                failures, settings.AXES_FAILURE_LIMIT
            )
            log.info('{0} {1}'.format(fail_msg, count_msg))
    else:
        # Record failed attempt. Whether or not the IP address or user agent is
        # used in counting failures is handled elsewhere, so we just record
        # everything here.
        AccessAttempt.objects.create(
            user_agent=user_agent,
            ip_address=ip_address,
            username=username,
            get_data=query2str(request.GET),
            post_data=query2str(request.POST),
            http_accept=http_accept,
            path_info=path_info,
            failures_since_start=failures,
        )

        log.info(
            'AXES: New login failure by {0}. Creating access record.'.format(
                get_client_str(username, ip_address, user_agent, path_info)
            )
        )

    # no matter what, we want to lock them out if they're past the number of
    # attempts allowed, unless the user is set to notlockable
    if (
        failures >= settings.AXES_FAILURE_LIMIT and
        settings.AXES_LOCK_OUT_AT_FAILURE and
        is_user_lockable(request)
    ):
        log.warning('AXES: locked out {0} after repeated login attempts.'.format(
            get_client_str(username, ip_address, user_agent, path_info)
        ))

        # send signal when someone is locked out.
        user_locked_out.send(
            'axes', request=request, username=username, ip_address=ip_address
        )


@receiver(user_logged_in)
def log_user_logged_in(sender, request, user, **kwargs):
    """ When a user logs in, update the access log
    """
    username = user.get_username()
    ip_address = get_ip(request)
    user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
    path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
    http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
    log.info('AXES: Successful login by {0}.'.format(
        get_client_str(username, ip_address, user_agent, path_info)
    ))

    if not settings.AXES_DISABLE_SUCCESS_ACCESS_LOG:
        AccessLog.objects.create(
            user_agent=user_agent,
            ip_address=ip_address,
            username=username,
            http_accept=http_accept,
            path_info=path_info,
            trusted=True,
        )


@receiver(user_logged_out)
def log_user_logged_out(sender, request, user, **kwargs):
    """ When a user logs out, update the access log
    """
    log.info('AXES: Successful logout by {0}.'.format(user))

    if user and not settings.AXES_DISABLE_ACCESS_LOG:
        AccessLog.objects.filter(
            username=user.get_username(),
            logout_time__isnull=True,
        ).update(logout_time=timezone.now())


@receiver(post_save, sender=AccessAttempt)
def update_cache_after_save(instance, **kwargs):
    cache_hash_key = get_cache_key(instance)
    if not get_axes_cache().get(cache_hash_key):
        cache_timeout = get_cache_timeout()
        get_axes_cache().set(cache_hash_key, instance.failures_since_start, cache_timeout)


@receiver(post_delete, sender=AccessAttempt)
def delete_cache_after_delete(instance, **kwargs):
    cache_hash_key = get_cache_key(instance)
    get_axes_cache().delete(cache_hash_key)