/usr/share/gocode/src/github.com/minio/minio-go/api-get-object.go is in golang-github-minio-minio-go-dev 2.0.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 | /*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015, 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
import (
"errors"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
)
// GetObject - returns an seekable, readable object.
func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return nil, err
}
if err := isValidObjectName(objectName); err != nil {
return nil, err
}
var httpReader io.ReadCloser
var objectInfo ObjectInfo
var err error
// Create request channel.
reqCh := make(chan getRequest)
// Create response channel.
resCh := make(chan getResponse)
// Create done channel.
doneCh := make(chan struct{})
// This routine feeds partial object data as and when the caller reads.
go func() {
defer close(reqCh)
defer close(resCh)
// Loop through the incoming control messages and read data.
for {
select {
// When the done channel is closed exit our routine.
case <-doneCh:
// Close the http response body before returning.
// This ends the connection with the server.
if httpReader != nil {
httpReader.Close()
}
return
// Gather incoming request.
case req := <-reqCh:
// If this is the first request we may not need to do a getObject request yet.
if req.isFirstReq {
// First request is a Read/ReadAt.
if req.isReadOp {
// Differentiate between wanting the whole object and just a range.
if req.isReadAt {
// If this is a ReadAt request only get the specified range.
// Range is set with respect to the offset and length of the buffer requested.
// Do not set objectInfo from the first readAt request because it will not get
// the whole object.
httpReader, _, err = c.getObject(bucketName, objectName, req.Offset, int64(len(req.Buffer)))
} else {
// First request is a Read request.
httpReader, objectInfo, err = c.getObject(bucketName, objectName, req.Offset, 0)
}
if err != nil {
resCh <- getResponse{
Error: err,
}
return
}
// Read at least firstReq.Buffer bytes, if not we have
// reached our EOF.
size, err := io.ReadFull(httpReader, req.Buffer)
if err == io.ErrUnexpectedEOF {
// If an EOF happens after reading some but not
// all the bytes ReadFull returns ErrUnexpectedEOF
err = io.EOF
}
// Send back the first response.
resCh <- getResponse{
objectInfo: objectInfo,
Size: int(size),
Error: err,
didRead: true,
}
} else {
// First request is a Stat or Seek call.
// Only need to run a StatObject until an actual Read or ReadAt request comes through.
objectInfo, err = c.StatObject(bucketName, objectName)
if err != nil {
resCh <- getResponse{
Error: err,
}
// Exit the go-routine.
return
}
// Send back the first response.
resCh <- getResponse{
objectInfo: objectInfo,
}
}
} else if req.settingObjectInfo { // Request is just to get objectInfo.
objectInfo, err := c.StatObject(bucketName, objectName)
if err != nil {
resCh <- getResponse{
Error: err,
}
// Exit the goroutine.
return
}
// Send back the objectInfo.
resCh <- getResponse{
objectInfo: objectInfo,
}
} else {
// Offset changes fetch the new object at an Offset.
// Because the httpReader may not be set by the first
// request if it was a stat or seek it must be checked
// if the object has been read or not to only initialize
// new ones when they haven't been already.
// All readAt requests are new requests.
if req.DidOffsetChange || !req.beenRead {
if httpReader != nil {
// Close previously opened http reader.
httpReader.Close()
}
// If this request is a readAt only get the specified range.
if req.isReadAt {
// Range is set with respect to the offset and length of the buffer requested.
httpReader, _, err = c.getObject(bucketName, objectName, req.Offset, int64(len(req.Buffer)))
} else {
httpReader, objectInfo, err = c.getObject(bucketName, objectName, req.Offset, 0)
}
if err != nil {
resCh <- getResponse{
Error: err,
}
return
}
}
// Read at least req.Buffer bytes, if not we have
// reached our EOF.
size, err := io.ReadFull(httpReader, req.Buffer)
if err == io.ErrUnexpectedEOF {
// If an EOF happens after reading some but not
// all the bytes ReadFull returns ErrUnexpectedEOF
err = io.EOF
}
// Reply back how much was read.
resCh <- getResponse{
Size: int(size),
Error: err,
didRead: true,
objectInfo: objectInfo,
}
}
}
}
}()
// Create a newObject through the information sent back by reqCh.
return newObject(reqCh, resCh, doneCh), nil
}
// get request message container to communicate with internal
// go-routine.
type getRequest struct {
Buffer []byte
Offset int64 // readAt offset.
DidOffsetChange bool // Tracks the offset changes for Seek requests.
beenRead bool // Determines if this is the first time an object is being read.
isReadAt bool // Determines if this request is a request to a specific range
isReadOp bool // Determines if this request is a Read or Read/At request.
isFirstReq bool // Determines if this request is the first time an object is being accessed.
settingObjectInfo bool // Determines if this request is to set the objectInfo of an object.
}
// get response message container to reply back for the request.
type getResponse struct {
Size int
Error error
didRead bool // Lets subsequent calls know whether or not httpReader has been initiated.
objectInfo ObjectInfo // Used for the first request.
}
// Object represents an open object. It implements Read, ReadAt,
// Seeker, Close for a HTTP stream.
type Object struct {
// Mutex.
mutex *sync.Mutex
// User allocated and defined.
reqCh chan<- getRequest
resCh <-chan getResponse
doneCh chan<- struct{}
prevOffset int64
currOffset int64
objectInfo ObjectInfo
// Keeps track of closed call.
isClosed bool
// Keeps track of if this is the first call.
isStarted bool
// Previous error saved for future calls.
prevErr error
// Keeps track of if this object has been read yet.
beenRead bool
// Keeps track of if objectInfo has been set yet.
objectInfoSet bool
}
// doGetRequest - sends and blocks on the firstReqCh and reqCh of an object.
// Returns back the size of the buffer read, if anything was read, as well
// as any error encountered. For all first requests sent on the object
// it is also responsible for sending back the objectInfo.
func (o *Object) doGetRequest(request getRequest) (getResponse, error) {
o.reqCh <- request
response := <-o.resCh
// This was the first request.
if !o.isStarted {
// The object has been operated on.
o.isStarted = true
}
// Set the objectInfo if the request was not readAt
// and it hasn't been set before.
if !o.objectInfoSet && !request.isReadAt {
o.objectInfo = response.objectInfo
o.objectInfoSet = true
}
// Set beenRead only if it has not been set before.
if !o.beenRead {
o.beenRead = response.didRead
}
// Return any error to the top level.
if response.Error != nil {
return response, response.Error
}
return response, nil
}
// setOffset - handles the setting of offsets for
// Read/ReadAt/Seek requests.
func (o *Object) setOffset(bytesRead int64) error {
// Update the currentOffset.
o.currOffset += bytesRead
// Save the current offset as previous offset.
o.prevOffset = o.currOffset
if o.currOffset >= o.objectInfo.Size {
return io.EOF
}
return nil
}
// Read reads up to len(p) bytes into p. It returns the number of
// bytes read (0 <= n <= len(p)) and any error encountered. Returns
// io.EOF upon end of file.
func (o *Object) Read(b []byte) (n int, err error) {
if o == nil {
return 0, ErrInvalidArgument("Object is nil")
}
// Locking.
o.mutex.Lock()
defer o.mutex.Unlock()
// prevErr is previous error saved from previous operation.
if o.prevErr != nil || o.isClosed {
return 0, o.prevErr
}
// Create a new request.
readReq := getRequest{
isReadOp: true,
beenRead: o.beenRead,
Buffer: b,
}
// Alert that this is the first request.
if !o.isStarted {
readReq.isFirstReq = true
}
// Verify if offset has changed and currOffset is greater than
// previous offset. Perhaps due to Seek().
offsetChange := o.prevOffset - o.currOffset
if offsetChange < 0 {
offsetChange = -offsetChange
}
if offsetChange > 0 {
// Fetch the new reader at the current offset again.
readReq.Offset = o.currOffset
readReq.DidOffsetChange = true
} else {
// No offset changes no need to fetch new reader, continue
// reading.
readReq.DidOffsetChange = false
readReq.Offset = 0
}
// Send and receive from the first request.
response, err := o.doGetRequest(readReq)
if err != nil && err != io.EOF {
// Save the error for future calls.
o.prevErr = err
return response.Size, err
}
// Bytes read.
bytesRead := int64(response.Size)
// Set the new offset.
oerr := o.setOffset(bytesRead)
if oerr != nil {
// Save the error for future calls.
o.prevErr = oerr
return response.Size, oerr
}
// Return the response.
return response.Size, err
}
// Stat returns the ObjectInfo structure describing object.
func (o *Object) Stat() (ObjectInfo, error) {
if o == nil {
return ObjectInfo{}, ErrInvalidArgument("Object is nil")
}
// Locking.
o.mutex.Lock()
defer o.mutex.Unlock()
if o.prevErr != nil && o.prevErr != io.EOF || o.isClosed {
return ObjectInfo{}, o.prevErr
}
// This is the first request.
if !o.isStarted || !o.objectInfoSet {
statReq := getRequest{
isFirstReq: !o.isStarted,
settingObjectInfo: !o.objectInfoSet,
}
// Send the request and get the response.
_, err := o.doGetRequest(statReq)
if err != nil {
o.prevErr = err
return ObjectInfo{}, err
}
}
return o.objectInfo, nil
}
// ReadAt reads len(b) bytes from the File starting at byte offset
// off. It returns the number of bytes read and the error, if any.
// ReadAt always returns a non-nil error when n < len(b). At end of
// file, that error is io.EOF.
func (o *Object) ReadAt(b []byte, offset int64) (n int, err error) {
if o == nil {
return 0, ErrInvalidArgument("Object is nil")
}
// Locking.
o.mutex.Lock()
defer o.mutex.Unlock()
// prevErr is error which was saved in previous operation.
if o.prevErr != nil || o.isClosed {
return 0, o.prevErr
}
// Can only compare offsets to size when size has been set.
if o.objectInfoSet {
// If offset is negative than we return io.EOF.
// If offset is greater than or equal to object size we return io.EOF.
if offset >= o.objectInfo.Size || offset < 0 {
return 0, io.EOF
}
}
// Create the new readAt request.
readAtReq := getRequest{
isReadOp: true,
isReadAt: true,
DidOffsetChange: true, // Offset always changes.
beenRead: o.beenRead, // Set if this is the first request to try and read.
Offset: offset, // Set the offset.
Buffer: b,
}
// Alert that this is the first request.
if !o.isStarted {
readAtReq.isFirstReq = true
}
// Send and receive from the first request.
response, err := o.doGetRequest(readAtReq)
if err != nil && err != io.EOF {
// Save the error.
o.prevErr = err
return response.Size, err
}
// Bytes read.
bytesRead := int64(response.Size)
// There is no valid objectInfo yet
// to compare against for EOF.
if !o.objectInfoSet {
// Update the currentOffset.
o.currOffset += bytesRead
// Save the current offset as previous offset.
o.prevOffset = o.currOffset
} else {
// If this was not the first request update
// the offsets and compare against objectInfo
// for EOF.
oerr := o.setOffset(bytesRead)
if oerr != nil {
o.prevErr = oerr
return response.Size, oerr
}
}
return response.Size, err
}
// Seek sets the offset for the next Read or Write to offset,
// interpreted according to whence: 0 means relative to the
// origin of the file, 1 means relative to the current offset,
// and 2 means relative to the end.
// Seek returns the new offset and an error, if any.
//
// Seeking to a negative offset is an error. Seeking to any positive
// offset is legal, subsequent io operations succeed until the
// underlying object is not closed.
func (o *Object) Seek(offset int64, whence int) (n int64, err error) {
if o == nil {
return 0, ErrInvalidArgument("Object is nil")
}
// Locking.
o.mutex.Lock()
defer o.mutex.Unlock()
if o.prevErr != nil {
// At EOF seeking is legal allow only io.EOF, for any other errors we return.
if o.prevErr != io.EOF {
return 0, o.prevErr
}
}
// Negative offset is valid for whence of '2'.
if offset < 0 && whence != 2 {
return 0, ErrInvalidArgument(fmt.Sprintf("Negative position not allowed for %d.", whence))
}
// This is the first request. So before anything else
// get the ObjectInfo.
if !o.isStarted || !o.objectInfoSet {
// Create the new Seek request.
seekReq := getRequest{
isReadOp: false,
Offset: offset,
isFirstReq: true,
}
// Send and receive from the seek request.
_, err := o.doGetRequest(seekReq)
if err != nil {
// Save the error.
o.prevErr = err
return 0, err
}
}
// Save current offset as previous offset.
o.prevOffset = o.currOffset
// Switch through whence.
switch whence {
default:
return 0, ErrInvalidArgument(fmt.Sprintf("Invalid whence %d", whence))
case 0:
if offset > o.objectInfo.Size {
return 0, io.EOF
}
o.currOffset = offset
case 1:
if o.currOffset+offset > o.objectInfo.Size {
return 0, io.EOF
}
o.currOffset += offset
case 2:
// Seeking to positive offset is valid for whence '2', but
// since we are backing a Reader we have reached 'EOF' if
// offset is positive.
if offset > 0 {
return 0, io.EOF
}
// Seeking to negative position not allowed for whence.
if o.objectInfo.Size+offset < 0 {
return 0, ErrInvalidArgument(fmt.Sprintf("Seeking at negative offset not allowed for %d", whence))
}
o.currOffset = o.objectInfo.Size + offset
}
// Reset the saved error since we successfully seeked, let the Read
// and ReadAt decide.
if o.prevErr == io.EOF {
o.prevErr = nil
}
// Return the effective offset.
return o.currOffset, nil
}
// Close - The behavior of Close after the first call returns error
// for subsequent Close() calls.
func (o *Object) Close() (err error) {
if o == nil {
return ErrInvalidArgument("Object is nil")
}
// Locking.
o.mutex.Lock()
defer o.mutex.Unlock()
// if already closed return an error.
if o.isClosed {
return o.prevErr
}
// Close successfully.
close(o.doneCh)
// Save for future operations.
errMsg := "Object is already closed. Bad file descriptor."
o.prevErr = errors.New(errMsg)
// Save here that we closed done channel successfully.
o.isClosed = true
return nil
}
// newObject instantiates a new *minio.Object*
// ObjectInfo will be set by setObjectInfo
func newObject(reqCh chan<- getRequest, resCh <-chan getResponse, doneCh chan<- struct{}) *Object {
return &Object{
mutex: &sync.Mutex{},
reqCh: reqCh,
resCh: resCh,
doneCh: doneCh,
}
}
// getObject - retrieve object from Object Storage.
//
// Additionally this function also takes range arguments to download the specified
// range bytes of an object. Setting offset and length = 0 will download the full object.
//
// For more information about the HTTP Range header.
// go to http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35.
func (c Client) getObject(bucketName, objectName string, offset, length int64) (io.ReadCloser, ObjectInfo, error) {
// Validate input arguments.
if err := isValidBucketName(bucketName); err != nil {
return nil, ObjectInfo{}, err
}
if err := isValidObjectName(objectName); err != nil {
return nil, ObjectInfo{}, err
}
customHeader := make(http.Header)
// Set ranges if length and offset are valid.
// See https://tools.ietf.org/html/rfc7233#section-3.1 for reference.
if length > 0 && offset >= 0 {
customHeader.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
} else if offset > 0 && length == 0 {
customHeader.Set("Range", fmt.Sprintf("bytes=%d-", offset))
} else if length < 0 && offset == 0 {
customHeader.Set("Range", fmt.Sprintf("bytes=%d", length))
}
// Execute GET on objectName.
resp, err := c.executeMethod("GET", requestMetadata{
bucketName: bucketName,
objectName: objectName,
customHeader: customHeader,
})
if err != nil {
return nil, ObjectInfo{}, err
}
if resp != nil {
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
return nil, ObjectInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
}
}
// Trim off the odd double quotes from ETag in the beginning and end.
md5sum := strings.TrimPrefix(resp.Header.Get("ETag"), "\"")
md5sum = strings.TrimSuffix(md5sum, "\"")
// Parse the date.
date, err := time.Parse(http.TimeFormat, resp.Header.Get("Last-Modified"))
if err != nil {
msg := "Last-Modified time format not recognized. " + reportIssue
return nil, ObjectInfo{}, ErrorResponse{
Code: "InternalError",
Message: msg,
RequestID: resp.Header.Get("x-amz-request-id"),
HostID: resp.Header.Get("x-amz-id-2"),
Region: resp.Header.Get("x-amz-bucket-region"),
}
}
// Get content-type.
contentType := strings.TrimSpace(resp.Header.Get("Content-Type"))
if contentType == "" {
contentType = "application/octet-stream"
}
var objectStat ObjectInfo
objectStat.ETag = md5sum
objectStat.Key = objectName
objectStat.Size = resp.ContentLength
objectStat.LastModified = date
objectStat.ContentType = contentType
// do not close body here, caller will close
return resp.Body, objectStat, nil
}
|