This file is indexed.

/usr/lib/python3/dist-packages/S3/MultiPart.py is in s3cmd 2.0.1-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# -*- coding: utf-8 -*-

## Amazon S3 Multipart upload support
## Author: Jerome Leclanche <jerome.leclanche@gmail.com>
## License: GPL Version 2

from __future__ import absolute_import

import os
import sys
from stat import ST_SIZE
from logging import debug, info, warning, error
from .Utils import getTextFromXml, getTreeFromXml, formatSize, unicodise, deunicodise, calculateChecksum, parseNodes, encode_to_s3

class MultiPartUpload(object):

    MIN_CHUNK_SIZE_MB = 5       # 5MB
    MAX_CHUNK_SIZE_MB = 5120    # 5GB
    MAX_FILE_SIZE = 42949672960 # 5TB

    def __init__(self, s3, file_stream, uri, headers_baseline=None):
        self.s3 = s3
        self.file_stream = file_stream
        self.uri = uri
        self.parts = {}
        self.headers_baseline = headers_baseline or {}
        self.upload_id = self.initiate_multipart_upload()

    def get_parts_information(self, uri, upload_id):
        multipart_response = self.s3.list_multipart(uri, upload_id)
        tree = getTreeFromXml(multipart_response['data'])

        parts = dict()
        for elem in parseNodes(tree):
            try:
                parts[int(elem['PartNumber'])] = {'checksum': elem['ETag'], 'size': elem['Size']}
            except KeyError:
                pass

        return parts

    def get_unique_upload_id(self, uri):
        upload_id = None
        multipart_response = self.s3.get_multipart(uri)
        tree = getTreeFromXml(multipart_response['data'])
        for mpupload in parseNodes(tree):
            try:
                mp_upload_id = mpupload['UploadId']
                mp_path = mpupload['Key']
                info("mp_path: %s, object: %s" % (mp_path, uri.object()))
                if mp_path == uri.object():
                    if upload_id is not None:
                        raise ValueError("More than one UploadId for URI %s.  Disable multipart upload, or use\n %s multipart %s\nto list the Ids, then pass a unique --upload-id into the put command." % (uri, sys.argv[0], uri))
                    upload_id = mp_upload_id
            except KeyError:
                pass

        return upload_id

    def initiate_multipart_upload(self):
        """
        Begin a multipart upload
        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadInitiate.html
        """
        if self.s3.config.upload_id is not None:
            self.upload_id = self.s3.config.upload_id
        elif self.s3.config.put_continue:
            self.upload_id = self.get_unique_upload_id(self.uri)
        else:
            self.upload_id = None

        if self.upload_id is None:
            request = self.s3.create_request("OBJECT_POST", uri = self.uri,
                                             headers = self.headers_baseline,
                                             uri_params = {'uploads': None})
            response = self.s3.send_request(request)
            data = response["data"]
            self.upload_id = getTextFromXml(data, "UploadId")

        return self.upload_id

    def upload_all_parts(self, extra_label=''):
        """
        Execute a full multipart upload on a file
        Returns the seq/etag dict
        TODO use num_processes to thread it
        """
        if not self.upload_id:
            raise RuntimeError("Attempting to use a multipart upload that has not been initiated.")

        self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
        filename = self.file_stream.stream_name

        if filename != u"<stdin>":
                size_left = file_size = os.stat(deunicodise(filename))[ST_SIZE]
                nr_parts = file_size // self.chunk_size + (file_size % self.chunk_size and 1)
                debug("MultiPart: Uploading %s in %d parts" % (filename, nr_parts))
        else:
            debug("MultiPart: Uploading from %s" % filename)

        remote_statuses = dict()
        if self.s3.config.put_continue:
            remote_statuses = self.get_parts_information(self.uri, self.upload_id)

        if extra_label:
            extra_label = u' ' + extra_label
        seq = 1
        if filename != u"<stdin>":
            while size_left > 0:
                offset = self.chunk_size * (seq - 1)
                current_chunk_size = min(file_size - offset, self.chunk_size)
                size_left -= current_chunk_size
                labels = {
                    'source' : filename,
                    'destination' : self.uri.uri(),
                    'extra' : "[part %d of %d, %s]%s" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True), extra_label)
                }
                try:
                    self.upload_part(seq, offset, current_chunk_size, labels, remote_status = remote_statuses.get(seq))
                except:
                    error(u"\nUpload of '%s' part %d failed. Use\n  %s abortmp %s %s\nto abort the upload, or\n  %s --upload-id %s put ...\nto continue the upload."
                          % (filename, seq, sys.argv[0], self.uri, self.upload_id, sys.argv[0], self.upload_id))
                    raise
                seq += 1
        else:
            while True:
                buffer = self.file_stream.read(self.chunk_size)
                offset = 0 # send from start of the buffer
                current_chunk_size = len(buffer)
                labels = {
                    'source' : filename,
                    'destination' : self.uri.uri(),
                    'extra' : "[part %d, %s]" % (seq, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
                }
                if len(buffer) == 0: # EOF
                    break
                try:
                    self.upload_part(seq, offset, current_chunk_size, labels, buffer, remote_status = remote_statuses.get(seq))
                except:
                    error(u"\nUpload of '%s' part %d failed. Use\n  %s abortmp %s %s\nto abort, or\n  %s --upload-id %s put ...\nto continue the upload."
                          % (filename, seq, sys.argv[0], self.uri, self.upload_id, sys.argv[0], self.upload_id))
                    raise
                seq += 1

        debug("MultiPart: Upload finished: %d parts", seq - 1)

    def upload_part(self, seq, offset, chunk_size, labels, buffer = '', remote_status = None):
        """
        Upload a file chunk
        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
        """
        # TODO implement Content-MD5
        debug("Uploading part %i of %r (%s bytes)" % (seq, self.upload_id, chunk_size))

        if remote_status is not None:
            if int(remote_status['size']) == chunk_size:
                checksum = calculateChecksum(buffer, self.file_stream, offset, chunk_size, self.s3.config.send_chunk)
                remote_checksum = remote_status['checksum'].strip('"\'')
                if remote_checksum == checksum:
                    warning("MultiPart: size and md5sum match for %s part %d, skipping." % (self.uri, seq))
                    self.parts[seq] = remote_status['checksum']
                    return
                else:
                    warning("MultiPart: checksum (%s vs %s) does not match for %s part %d, reuploading."
                            % (remote_checksum, checksum, self.uri, seq))
            else:
                warning("MultiPart: size (%d vs %d) does not match for %s part %d, reuploading."
                        % (int(remote_status['size']), chunk_size, self.uri, seq))

        headers = { "content-length": str(chunk_size) }
        query_string_params = {'partNumber':'%s' % seq,
                               'uploadId': self.upload_id}
        request = self.s3.create_request("OBJECT_PUT", uri = self.uri,
                                         headers = headers,
                                         uri_params = query_string_params)
        response = self.s3.send_file(request, self.file_stream, labels, buffer, offset = offset, chunk_size = chunk_size)
        self.parts[seq] = response["headers"].get('etag', '').strip('"\'')
        return response

    def complete_multipart_upload(self):
        """
        Finish a multipart upload
        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadComplete.html
        """
        debug("MultiPart: Completing upload: %s" % self.upload_id)

        parts_xml = []
        part_xml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>"
        for seq, etag in self.parts.items():
            parts_xml.append(part_xml % (seq, etag))
        body = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>" % ("".join(parts_xml))

        headers = { "content-length": str(len(body)) }
        request = self.s3.create_request("OBJECT_POST", uri = self.uri,
                                         headers = headers, body = body,
                                         uri_params = {'uploadId': self.upload_id})
        response = self.s3.send_request(request)

        return response

    def abort_upload(self):
        """
        Abort multipart upload
        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadAbort.html
        """
        debug("MultiPart: Aborting upload: %s" % self.upload_id)
        #request = self.s3.create_request("OBJECT_DELETE", uri = self.uri,
        #                                  uri_params = {'uploadId': self.upload_id})
        #response = self.s3.send_request(request)
        response = None
        return response

# vim:et:ts=4:sts=4:ai