This file is indexed.

/usr/lib/python3/dist-packages/channels/message.py is in python3-django-channels 1.1.8.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from __future__ import unicode_literals

import copy
import threading
import time

from .channel import Channel
from .signals import consumer_finished, consumer_started


class Message(object):
    """
    Represents a message sent over a Channel.

    The message content is a dict called .content, while
    reply_channel is an optional extra attribute representing a channel
    to use to reply to this message's end user, if that makes sense.
    """

    def __init__(self, content, channel_name, channel_layer):
        self.content = content
        self.channel = Channel(
            channel_name,
            channel_layer=channel_layer,
        )
        self.channel_layer = channel_layer
        if content.get("reply_channel", None):
            self.reply_channel = Channel(
                content["reply_channel"],
                channel_layer=self.channel_layer,
            )
        else:
            self.reply_channel = None

    def __getitem__(self, key):
        return self.content[key]

    def __setitem__(self, key, value):
        self.content[key] = value

    def __contains__(self, key):
        return key in self.content

    def keys(self):
        return self.content.keys()

    def values(self):
        return self.content.values()

    def items(self):
        return self.content.items()

    def get(self, key, default=None):
        return self.content.get(key, default)

    def copy(self):
        """
        Returns a safely content-mutable copy of this Message.
        """
        return self.__class__(
            copy.deepcopy(self.content),
            self.channel.name,
            self.channel_layer,
        )


class PendingMessageStore(object):
    """
    Singleton object used for storing pending messages that should be sent
    to a channel or group when a consumer finishes.

    Will retry when it sees ChannelFull up to a limit; if you want more control
    over this, change to `immediately=True` in your send method and handle it
    yourself.
    """

    threadlocal = threading.local()

    retry_time = 2  # seconds
    retry_interval = 0.2  # seconds

    def prepare(self, **kwargs):
        """
        Sets the message store up to receive messages.
        """
        self.threadlocal.messages = []

    @property
    def active(self):
        """
        Returns if the pending message store can be used or not
        (it can only be used inside consumers)
        """
        return hasattr(self.threadlocal, "messages")

    def append(self, sender, message):
        self.threadlocal.messages.append((sender, message))

    def send_and_flush(self, **kwargs):
        for sender, message in getattr(self.threadlocal, "messages", []):
            # Loop until the retry time limit is hit
            started = time.time()
            while time.time() - started < self.retry_time:
                try:
                    sender.send(message, immediately=True)
                except sender.channel_layer.ChannelFull:
                    time.sleep(self.retry_interval)
                    continue
                else:
                    break
            # If we didn't break out, we failed to send, so do a nice exception
            else:
                raise RuntimeError(
                    "Failed to send queued message to %s after retrying for %.2fs.\n"
                    "You need to increase the consumption rate on this channel, its capacity,\n"
                    "or handle the ChannelFull exception yourself after adding\n"
                    "immediately=True to send()." % (sender, self.retry_time)
                )
        delattr(self.threadlocal, "messages")


pending_message_store = PendingMessageStore()
consumer_started.connect(pending_message_store.prepare)
consumer_finished.connect(pending_message_store.send_and_flush)