This file is indexed.

/usr/share/pyshared/checkbox/message.py is in checkbox 0.13.7.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
#
# This file is part of Checkbox.
#
# Copyright 2008 Canonical Ltd.
#
# Checkbox is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Checkbox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Checkbox.  If not, see <http://www.gnu.org/licenses/>.
#
import os
import logging
import itertools
import posixpath
import sys

from checkbox.contrib import bpickle
from checkbox.lib.safe import safe_close

HELD = "h"
BROKEN = "b"

ANCIENT = 1


class Message(dict):

    def __init__(self, message, filename):
        super(Message, self).__init__(message)
        self.filename = filename


class MessageStore(object):
    """A message store which stores its messages in a file system hierarchy."""

    #This caches everything but a message's data, making it manageable to keep in memory.
    _message_cache = {}

    #Setting this to False speeds things up considerably, at the expense 
    #of a higher risk of data loss during a crash
    safe_file_closing = True

    def __init__(self, persist, directory, directory_size=1000):
        self._directory = directory
        self._directory_size = directory_size
        self._original_persist = persist
        self._persist = persist.root_at("message-store")
        message_dir = self._message_dir()
        if not posixpath.isdir(message_dir):
            os.makedirs(message_dir)

    def commit(self):
        """Save metadata to disk."""
        self._original_persist.save()

    def get_sequence(self):
        """
        Get the sequence number of the message that the server expects us to
        send on the next exchange.
        """
        return self._persist.get("sequence", 0)

    def set_sequence(self, number):
        """
        Set the sequence number of the message that the server expects us to
        send on the next exchange.
        """
        self._persist.set("sequence", number)

    def get_pending_offset(self):
        return self._persist.get("pending_offset", 0)

    def set_pending_offset(self, val):
        """
        Set the offset into the message pool to consider assigned to the
        current sequence number as returned by l{get_sequence}.
        """
        self._persist.set("pending_offset", val)

    def add_pending_offset(self, val=1):
        self.set_pending_offset(self.get_pending_offset() + val)

    def remove_pending_offset(self, val=1):
        pending_offset = self.get_pending_offset()
        if pending_offset - val < 0:
            return False

        self.set_pending_offset(pending_offset - val)
        return True

    def count_pending_messages(self):
        """Return the number of pending messages."""
        return sum(1 for x in self._walk_pending_messages())

    def get_pending_messages(self, max=None):
        """Get any pending messages that aren't being held, up to max."""
        messages = []
        for filename in self._walk_pending_messages():
            if max is not None and len(messages) >= max:
                break
            try:
                message = self._read_message(filename)
            except ValueError, e:
                logging.exception(e)
                self._add_flags(filename, BROKEN)
            else:
                messages.append(message)

        return messages

    def set_pending_flags(self, flags):
        for filename in self._walk_pending_messages():
            self._set_flags(filename, flags)
            break

    def add_pending_flags(self, flags):
        for filename in self._walk_pending_messages():
            self._add_flags(filename, flags)
            break

    def delete_old_messages(self):
        """Delete messages which are unlikely to be needed in the future."""
        filenames = self._get_sorted_filenames()
        for fn in itertools.islice(self._walk_messages(exclude=HELD+BROKEN),
                                   self.get_pending_offset()):
            os.unlink(fn)
            containing_dir = posixpath.split(fn)[0]
            if not os.listdir(containing_dir):
                os.rmdir(containing_dir)

    def delete_all_messages(self):
        """Remove ALL stored messages."""
        self.set_pending_offset(0)
        for filename in self._walk_messages():
            os.unlink(filename)

    def is_pending(self, message_id):
        """Return bool indicating if C{message_id} still hasn't been delivered.

        @param message_id: Identifier returned by the L{add()} method.
        """
        i = 0
        pending_offset = self.get_pending_offset()
        for filename in self._walk_messages(exclude=BROKEN):
            flags = self._get_flags(filename)
            if ((HELD in flags or i >= pending_offset) and
                os.stat(filename).st_ino == message_id):
                return True
            if BROKEN not in flags and HELD not in flags:
                i += 1
        return False

    def add(self, message):
        """Queue a message for delivery.

        @return: message_id, which is an identifier for the added message.
        """
        filename = self._get_next_message_filename()

        return self._write_message(message, filename)

    def update(self, message):
        return self._write_message(message)

    def _get_next_message_filename(self):
        message_dirs = self._get_sorted_filenames()
        if message_dirs:
            newest_dir = message_dirs[-1]
        else:
            os.makedirs(self._message_dir("0"))
            newest_dir = "0"

        message_filenames = self._get_sorted_filenames(newest_dir)
        if not message_filenames:
            filename = self._message_dir(newest_dir, "0")
        elif len(message_filenames) < self._directory_size:
            filename = str(int(message_filenames[-1].split("_")[0]) + 1)
            filename = self._message_dir(newest_dir, filename)
        else:
            newest_dir = self._message_dir(str(int(newest_dir) + 1))
            os.makedirs(newest_dir)
            filename = posixpath.join(newest_dir, "0")

        return filename

    def _walk_pending_messages(self):
        """Walk the files which are definitely pending."""
        pending_offset = self.get_pending_offset()
        for i, filename in enumerate(self._walk_messages(exclude=HELD+BROKEN)):
            if i >= pending_offset:
                yield filename

    def _walk_messages(self, exclude=None):
        if exclude:
            exclude = set(exclude)
        message_dirs = self._get_sorted_filenames()
        for message_dir in message_dirs:
            for filename in self._get_sorted_filenames(message_dir):
                flags = set(self._get_flags(filename))
                if (not exclude or not exclude & flags):
                    yield self._message_dir(message_dir, filename)

    def _get_sorted_filenames(self, dir=""):
        message_files = [x for x in os.listdir(self._message_dir(dir))
                         if not x.endswith(".tmp")]
        message_files = sorted(message_files,
            key=lambda x: int(x.split("_")[0]))
        return message_files

    def _message_dir(self, *args):
        return posixpath.join(self._directory, *args)

    def _get_content(self, filename):
        file = open(filename)
        try:
            return file.read()
        finally:
            safe_close(file, safe=self.safe_file_closing)

    def _get_flags(self, path):
        basename = posixpath.basename(path)
        if "_" in basename:
            return basename.split("_")[1]
        return ""

    def _set_flags(self, path, flags):
        dirname, basename = posixpath.split(path)
        new_path = posixpath.join(dirname, basename.split("_")[0])
        if flags:
            new_path += "_"+"".join(sorted(set(flags)))
        os.rename(path, new_path)
        return new_path

    def _add_flags(self, path, flags):
        self._set_flags(path, self._get_flags(path)+flags)

    def _load_message(self, data):
        return bpickle.loads(data)

    def _dump_message(self, message):
        return bpickle.dumps(message)

    def _read_message(self, filename, cache=False):
        #cache basically indicates whether the caller cares about having "data"
        if cache and filename in self._message_cache:
            return Message(self._message_cache[filename],filename)

        data = self._get_content(filename)
        message = self._load_message(data)
        return Message(message, filename)

    def _write_message(self, message, filename=None):
        if filename is None:
            filename = message.filename

        message_data = self._dump_message(message)

        file = open(filename + ".tmp", "w")
        file.write(message_data)
        safe_close(file, safe=self.safe_file_closing)

        os.rename(filename + ".tmp", filename)

        #Strip the big data element and shove it in the cache

        temp_message=dict(message)
        if "data" in temp_message:
            temp_message["data"] = None

        self._message_cache[filename] = temp_message

        # For now we use the inode as the message id, as it will work
        # correctly even faced with holding/unholding.  It will break
        # if the store is copied over for some reason, but this shouldn't
        # present an issue given the current uses.  In the future we
        # should have a nice transactional storage (e.g. sqlite) which
        # will offer a more strong primary key.
        return os.stat(filename).st_ino


def got_next_sequence(message_store, next_sequence):
    """Our peer has told us what it expects our next message's sequence to be.

    Call this with the message store and sequence number that the peer
    wants next; this will do various things based on what *this* side
    has in its outbound queue store.

    1. The peer expects a sequence greater than what we last
       sent. This is the common case and generally it should be
       expecting last_sent_sequence+len(messages_sent)+1.

    2. The peer expects a sequence number our side has already sent,
       and we no longer have that message. In this case, just send
       *all* messages we have, including the previous generation,
       starting at the sequence number the peer expects (meaning that
       messages have probably been lost).

    3. The peer expects a sequence number we already sent, and we
       still have that message cached. In this case, we send starting
       from that message.

    If the next sequence from the server refers to a message older than
    we have, then L{ANCIENT} will be returned.
    """
    ret = None
    old_sequence = message_store.get_sequence()
    if next_sequence > old_sequence:
        message_store.delete_old_messages()
        pending_offset = next_sequence - old_sequence
    elif next_sequence < (old_sequence - message_store.get_pending_offset()):
        # "Ancient": The other side wants messages we don't have,
        # so let's just reset our counter to what it expects.
        pending_offset = 0
        ret = ANCIENT
    else:
        # No messages transferred, or
        # "Old": We'll try to send these old messages that the
        # other side still wants.
        pending_offset = (message_store.get_pending_offset() + next_sequence
                          - old_sequence)

    message_store.set_pending_offset(pending_offset)
    message_store.set_sequence(next_sequence)
    return ret