1
0
mirror of https://github.com/rclone/rclone.git synced 2026-01-08 11:33:33 +00:00

Compare commits

..

12 Commits
v1.11 ... v1.13

Author SHA1 Message Date
Nick Craig-Wood
88293a4b8a Version v1.13 2015-05-10 12:39:06 +01:00
Nick Craig-Wood
981104519e Revise documentation (especially sync) - fixes #39 2015-05-10 12:17:24 +01:00
Nick Craig-Wood
1d254a3674 Implement --timeout and --conntimeout - fixes #54
NB dropbox still to do
2015-05-10 11:29:55 +01:00
Nick Craig-Wood
f88d171afd s3: ignore etags from multipart uploads which aren't md5sums - fixes #56 2015-05-10 11:29:55 +01:00
Nick Craig-Wood
ba2091725e Version v1.12 2015-03-15 15:55:38 +00:00
Nick Craig-Wood
7c120b8bc5 drive: add --drive-chunk-size and --drive-upload-cutoff parameters 2015-03-15 15:27:55 +00:00
Nick Craig-Wood
5cc5429f99 drive: switch to insert from update when a failed copy deletes the upload 2015-03-15 15:27:55 +00:00
Nick Craig-Wood
09d71239b6 Make file size render more neatly and prevent from being < 0 2015-03-15 15:27:55 +00:00
Nick Craig-Wood
c643e4585e core: Log duplicate files if they are detected 2015-03-15 15:27:55 +00:00
Nick Craig-Wood
873db29391 Log all objects more informatively 2015-03-15 15:27:55 +00:00
Nick Craig-Wood
81a933ae38 drive: Use chunked upload for files - fixes #33 2015-03-15 15:27:55 +00:00
Nick Craig-Wood
ecb3c7bcef drive, googlecloudstorage: remove SeekWrapper after googleapi fix 2015-03-04 20:47:59 +00:00
18 changed files with 570 additions and 190 deletions

View File

@@ -75,16 +75,16 @@ Subcommands
rclone copy source:path dest:path rclone copy source:path dest:path
Copy the source to the destination. Doesn't transfer Copy the source to the destination. Doesn't transfer
unchanged files, testing first by modification time then by unchanged files, testing by size and modification time or
MD5SUM. Doesn't delete files from the destination. MD5SUM. Doesn't delete files from the destination.
rclone sync source:path dest:path rclone sync source:path dest:path
Sync the source to the destination. Doesn't transfer Sync the source to the destination, changing the destination
unchanged files, testing first by modification time then by only. Doesn't transfer unchanged files, testing by size and
size. Deletes any files that exist in source that don't modification time or MD5SUM. Destination is updated to match
exist in destination. Since this can cause data loss, test source, including deleting files if necessary. Since this can
first with the `--dry-run` flag. cause data loss, test first with the `--dry-run` flag.
rclone ls [remote:path] rclone ls [remote:path]
@@ -96,7 +96,8 @@ List all directories/containers/buckets in the the path.
rclone lsl [remote:path] rclone lsl [remote:path]
List all the objects in the the path with modification time, size and path. List all the objects in the the path with modification time,
size and path.
rclone md5sum [remote:path] rclone md5sum [remote:path]
@@ -133,14 +134,16 @@ This help.
General options: General options:
``` ```
--bwlimit=0: Bandwidth limit in kBytes/s, or use suffix K|M|G --bwlimit=0: Bandwidth limit in kBytes/s, or use suffix k|M|G
--checkers=8: Number of checkers to run in parallel. --checkers=8: Number of checkers to run in parallel.
--config="~/.rclone.conf": Config file. --config="~/.rclone.conf": Config file.
--contimeout=1m0s: Connect timeout
-n, --dry-run=false: Do a trial run with no permanent changes -n, --dry-run=false: Do a trial run with no permanent changes
--log-file="": Log everything to this file --log-file="": Log everything to this file
--modify-window=1ns: Max time diff to be considered the same --modify-window=1ns: Max time diff to be considered the same
-q, --quiet=false: Print as little stuff as possible -q, --quiet=false: Print as little stuff as possible
--stats=1m0s: Interval to print stats (0 to disable) --stats=1m0s: Interval to print stats (0 to disable)
--timeout=5m0s: IO idle timeout
--transfers=4: Number of file transfers to run in parallel. --transfers=4: Number of file transfers to run in parallel.
-v, --verbose=false: Print lots more stuff -v, --verbose=false: Print lots more stuff
-V, --version=false: Print the version number -V, --version=false: Print the version number
@@ -172,12 +175,7 @@ So to copy a local directory to a swift container called backup:
rclone sync /home/source swift:backup rclone sync /home/source swift:backup
The modified time is stored as metadata on the object as For more help see the [online docs on Openstack Swift](http://rclone.org/swift).
`X-Object-Meta-Mtime` as floating point since the epoch.
This is a defacto standard (used in the official python-swiftclient
amongst others) for storing the modification time (as read using
os.Stat) for an object.
Amazon S3 Amazon S3
--------- ---------
@@ -189,8 +187,7 @@ So to copy a local directory to a s3 container called backup
rclone sync /home/source s3:backup rclone sync /home/source s3:backup
The modified time is stored as metadata on the object as For more help see the [online docs on Amazon S3](http://rclone.org/s3).
`X-Amz-Meta-Mtime` as floating point since the epoch.
Google drive Google drive
------------ ------------
@@ -205,7 +202,7 @@ To copy a local directory to a drive directory called backup
rclone copy /home/source remote:backup rclone copy /home/source remote:backup
Google drive stores modification times accurate to 1 ms natively. For more help see the [online docs on Google Drive](http://rclone.org/drive).
Dropbox Dropbox
------- -------
@@ -220,10 +217,7 @@ To copy a local directory to a drive directory called backup
rclone copy /home/source dropbox:backup rclone copy /home/source dropbox:backup
Md5sums and timestamps in RFC3339 format accurate to 1ns are stored in For more help see the [online docs on Dropbox](http://rclone.org/dropbox).
a Dropbox datastore called "rclone". Dropbox datastores are limited
to 100,000 rows so this is the maximum number of files rclone can
manage on Dropbox.
Google Cloud Storage Google Cloud Storage
-------------------- --------------------
@@ -239,9 +233,7 @@ To copy a local directory to a google cloud storage directory called backup
rclone copy /home/source remote:backup rclone copy /home/source remote:backup
Google google cloud storage stores md5sums natively and rclone stores For more help see the [online docs on Google Cloud Storage](http://rclone.org/googlecloudstorage/).
modification times as metadata on the object, under the "mtime" key in
RFC3339 format accurate to 1ns.
Single file copies Single file copies
------------------ ------------------
@@ -270,6 +262,15 @@ Bugs
Changelog Changelog
--------- ---------
* v1.13 - 2015-05-10
* Revise documentation (especially sync)
* Implement --timeout and --conntimeout
* s3: ignore etags from multipart uploads which aren't md5sums
* v1.12 - 2015-03-15
* drive: Use chunked upload for files above a certain size
* drive: add --drive-chunk-size and --drive-upload-cutoff parameters
* drive: switch to insert from update when a failed copy deletes the upload
* core: Log duplicate files if they are detected
* v1.11 - 2015-03-04 * v1.11 - 2015-03-04
* swift: add region parameter * swift: add region parameter
* drive: fix crash on failed to update remote mtime * drive: fix crash on failed to update remote mtime

View File

@@ -58,16 +58,16 @@ Subcommands
rclone copy source:path dest:path rclone copy source:path dest:path
Copy the source to the destination. Doesn't transfer Copy the source to the destination. Doesn't transfer
unchanged files, testing first by modification time then by unchanged files, testing by size and modification time or
MD5SUM. Doesn't delete files from the destination. MD5SUM. Doesn't delete files from the destination.
rclone sync source:path dest:path rclone sync source:path dest:path
Sync the source to the destination. Doesn't transfer Sync the source to the destination, changing the destination
unchanged files, testing first by modification time then by only. Doesn't transfer unchanged files, testing by size and
size. Deletes any files that exist in source that don't modification time or MD5SUM. Destination is updated to match
exist in destination. Since this can cause data loss, test source, including deleting files if necessary. Since this can
first with the `--dry-run` flag. cause data loss, test first with the `--dry-run` flag.
rclone ls [remote:path] rclone ls [remote:path]
@@ -79,7 +79,8 @@ List all directories/containers/buckets in the the path.
rclone lsl [remote:path] rclone lsl [remote:path]
List all the objects in the the path with modification time, size and path. List all the objects in the the path with modification time,
size and path.
rclone md5sum [remote:path] rclone md5sum [remote:path]
@@ -114,14 +115,16 @@ Enter an interactive configuration session.
This help. This help.
``` ```
--bwlimit=0: Bandwidth limit in kBytes/s, or use suffix K|M|G --bwlimit=0: Bandwidth limit in kBytes/s, or use suffix k|M|G
--checkers=8: Number of checkers to run in parallel. --checkers=8: Number of checkers to run in parallel.
--config="~/.rclone.conf": Config file. --config="~/.rclone.conf": Config file.
--contimeout=1m0s: Connect timeout
-n, --dry-run=false: Do a trial run with no permanent changes -n, --dry-run=false: Do a trial run with no permanent changes
--log-file="": Log everything to this file --log-file="": Log everything to this file
--modify-window=1ns: Max time diff to be considered the same --modify-window=1ns: Max time diff to be considered the same
-q, --quiet=false: Print as little stuff as possible -q, --quiet=false: Print as little stuff as possible
--stats=1m0s: Interval to print stats (0 to disable) --stats=1m0s: Interval to print stats (0 to disable)
--timeout=5m0s: IO idle timeout
--transfers=4: Number of file transfers to run in parallel. --transfers=4: Number of file transfers to run in parallel.
-v, --verbose=false: Print lots more stuff -v, --verbose=false: Print lots more stuff
-V, --version=false: Print the version number -V, --version=false: Print the version number

View File

@@ -2,34 +2,34 @@
title: "Rclone downloads" title: "Rclone downloads"
description: "Download rclone binaries for your OS." description: "Download rclone binaries for your OS."
type: page type: page
date: "2015-03-04" date: "2015-05-10"
--- ---
Rclone Download v1.11 Rclone Download v1.13
===================== =====================
* Windows * Windows
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.11-windows-386.zip) * [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-windows-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.11-windows-amd64.zip) * [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-windows-amd64.zip)
* OSX * OSX
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.11-osx-386.zip) * [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-osx-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.11-osx-amd64.zip) * [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-osx-amd64.zip)
* Linux * Linux
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.11-linux-386.zip) * [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-linux-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.11-linux-amd64.zip) * [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-linux-amd64.zip)
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.11-linux-arm.zip) * [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.13-linux-arm.zip)
* FreeBSD * FreeBSD
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.11-freebsd-386.zip) * [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-freebsd-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.11-freebsd-amd64.zip) * [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-freebsd-amd64.zip)
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.11-freebsd-arm.zip) * [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.13-freebsd-arm.zip)
* NetBSD * NetBSD
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.11-netbsd-386.zip) * [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-netbsd-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.11-netbsd-amd64.zip) * [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-netbsd-amd64.zip)
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.11-netbsd-arm.zip) * [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.13-netbsd-arm.zip)
* OpenBSD * OpenBSD
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.11-openbsd-386.zip) * [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-openbsd-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.11-openbsd-amd64.zip) * [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-openbsd-amd64.zip)
* Plan 9 * Plan 9
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.11-plan9-386.zip) * [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-plan9-386.zip)
Older downloads can be found [here](http://downloads.rclone.org/) Older downloads can be found [here](http://downloads.rclone.org/)

View File

@@ -1,7 +1,7 @@
--- ---
title: "Google drive" title: "Google drive"
description: "Rclone docs for Google drive" description: "Rclone docs for Google drive"
date: "2014-04-26" date: "2015-05-10"
--- ---
<i class="fa fa-google"></i> Google Drive <i class="fa fa-google"></i> Google Drive
@@ -73,3 +73,16 @@ Modified time
------------- -------------
Google drive stores modification times accurate to 1 ms. Google drive stores modification times accurate to 1 ms.
Revisions
---------
Google drive stores revisions of files. When you upload a change to
an existing file to google drive using rclone it will create a new
revision of that file.
Revisions follow the standard google policy which at time of writing
was
* They are deleted after 30 days or 100 revisions (whatever comes first).
* They do not count towards a user storage quota.

View File

@@ -39,6 +39,10 @@ const (
var ( var (
// Flags // Flags
driveFullList = pflag.BoolP("drive-full-list", "", true, "Use a full listing for directory list. More data but usually quicker.") driveFullList = pflag.BoolP("drive-full-list", "", true, "Use a full listing for directory list. More data but usually quicker.")
// chunkSize is the size of the chunks created during a resumable upload and should be a power of two.
// 1<<18 is the minimum size supported by the Google uploader, and there is no maximum.
chunkSize = fs.SizeSuffix(256 * 1024)
driveUploadCutoff = chunkSize
// Description of how to auth for this app // Description of how to auth for this app
driveAuth = &googleauth.Auth{ driveAuth = &googleauth.Auth{
Scope: "https://www.googleapis.com/auth/drive", Scope: "https://www.googleapis.com/auth/drive",
@@ -63,6 +67,8 @@ func init() {
Help: "Google Application Client Secret - leave blank to use rclone's.", Help: "Google Application Client Secret - leave blank to use rclone's.",
}}, }},
}) })
pflag.VarP(&driveUploadCutoff, "drive-upload-cutoff", "", "Cutoff for switching to chunked upload")
pflag.VarP(&chunkSize, "drive-chunk-size", "", "Upload chunk size. Must a power of 2 >= 256k.")
} }
// FsDrive represents a remote drive server // FsDrive represents a remote drive server
@@ -183,24 +189,41 @@ func (f *FsDrive) endCall(err error) bool {
fs.Debug(f, "Reducing sleep to %v", f.sleepTime) fs.Debug(f, "Reducing sleep to %v", f.sleepTime)
} }
} else { } else {
fs.Debug(f, "Error recived: %v", err) fs.Debug(f, "Error recived: %T %#v", err, err)
if gerr, ok := err.(*googleapi.Error); ok { // Check for net error Timeout()
if len(gerr.Errors) > 0 { if x, ok := err.(interface {
Timeout() bool
}); ok && x.Timeout() {
again = true
}
// Check for net error Temporary()
if x, ok := err.(interface {
Temporary() bool
}); ok && x.Temporary() {
again = true
}
switch gerr := err.(type) {
case *googleapi.Error:
if gerr.Code >= 500 && gerr.Code < 600 {
// All 5xx errors should be retried
again = true
} else if len(gerr.Errors) > 0 {
reason := gerr.Errors[0].Reason reason := gerr.Errors[0].Reason
if reason == "rateLimitExceeded" || reason == "userRateLimitExceeded" { if reason == "rateLimitExceeded" || reason == "userRateLimitExceeded" {
f.sleepTime *= 2
if f.sleepTime > maxSleep {
f.sleepTime = maxSleep
}
if f.sleepTime != oldSleepTime {
fs.Debug(f, "Rate limited, increasing sleep to %v", f.sleepTime)
}
again = true again = true
} }
} }
} }
} }
if again {
f.sleepTime *= 2
if f.sleepTime > maxSleep {
f.sleepTime = maxSleep
}
if f.sleepTime != oldSleepTime {
fs.Debug(f, "Rate limited, increasing sleep to %v", f.sleepTime)
}
}
return again return again
} }
@@ -276,8 +299,27 @@ OUTER:
return return
} }
// Returns true of x is a power of 2 or zero
func isPowerOfTwo(x int64) bool {
switch {
case x == 0:
return true
case x < 0:
return false
default:
return (x & (x - 1)) == 0
}
}
// NewFs contstructs an FsDrive from the path, container:path // NewFs contstructs an FsDrive from the path, container:path
func NewFs(name, path string) (fs.Fs, error) { func NewFs(name, path string) (fs.Fs, error) {
if !isPowerOfTwo(int64(chunkSize)) {
return nil, fmt.Errorf("drive: chunk size %v isn't a power of two", chunkSize)
}
if chunkSize < 256*1024 {
return nil, fmt.Errorf("drive: chunk size can't be less than 256k - was %v", chunkSize)
}
t, err := driveAuth.NewTransport(name) t, err := driveAuth.NewTransport(name)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -712,17 +754,24 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64
ModifiedDate: modTime.Format(timeFormatOut), ModifiedDate: modTime.Format(timeFormatOut),
} }
// Make the API request to upload metadata and file data.
in = &fs.SeekWrapper{In: in, Size: size}
var info *drive.File var info *drive.File
// Don't retry, return a retry error instead if size == 0 || size < int64(driveUploadCutoff) {
f.beginCall() // Make the API request to upload metadata and file data.
info, err = f.svc.Files.Insert(createInfo).Media(in).Do() // Don't retry, return a retry error instead
if f.endCall(err) { f.beginCall()
return o, fs.RetryErrorf("Upload failed - retry: %s", err) info, err = f.svc.Files.Insert(createInfo).Media(in).Do()
} if f.endCall(err) {
if err != nil { return o, fs.RetryErrorf("Upload failed - retry: %s", err)
return o, fmt.Errorf("Upload failed: %s", err) }
if err != nil {
return o, fmt.Errorf("Upload failed: %s", err)
}
} else {
// Upload the file in chunks
info, err = f.Upload(in, size, createInfo.MimeType, createInfo, remote)
if err != nil {
return o, err
}
} }
o.setMetaData(info) o.setMetaData(info)
return o, nil return o, nil
@@ -944,17 +993,24 @@ func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) erro
} }
// Make the API request to upload metadata and file data. // Make the API request to upload metadata and file data.
in = &fs.SeekWrapper{In: in, Size: size}
var err error var err error
var info *drive.File var info *drive.File
// Don't retry, return a retry error instead if size == 0 || size < int64(driveUploadCutoff) {
o.drive.beginCall() // Don't retry, return a retry error instead
info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do() o.drive.beginCall()
if o.drive.endCall(err) { info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do()
return fs.RetryErrorf("Update failed - retry: %s", err) if o.drive.endCall(err) {
} return fs.RetryErrorf("Update failed - retry: %s", err)
if err != nil { }
return fmt.Errorf("Update failed: %s", err) if err != nil {
return fmt.Errorf("Update failed: %s", err)
}
} else {
// Upload the file in chunks
info, err = o.drive.Upload(in, size, fs.MimeType(o), updateInfo, o.remote)
if err != nil {
return err
}
} }
o.setMetaData(info) o.setMetaData(info)
return nil return nil

246
drive/upload.go Normal file
View File

@@ -0,0 +1,246 @@
// Upload for drive
//
// Docs
// Resumable upload: https://developers.google.com/drive/web/manage-uploads#resumable
// Best practices: https://developers.google.com/drive/web/manage-uploads#best-practices
// Files insert: https://developers.google.com/drive/v2/reference/files/insert
// Files update: https://developers.google.com/drive/v2/reference/files/update
//
// This contains code adapted from google.golang.org/api (C) the GO AUTHORS
package drive
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"regexp"
"strconv"
"github.com/ncw/rclone/fs"
"google.golang.org/api/drive/v2"
"google.golang.org/api/googleapi"
)
const (
// statusResumeIncomplete is the code returned by the Google uploader when the transfer is not yet complete.
statusResumeIncomplete = 308
// Number of times to try each chunk
maxTries = 10
)
// resumableUpload is used by the generated APIs to provide resumable uploads.
// It is not used by developers directly.
type resumableUpload struct {
f *FsDrive
remote string
// URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable".
URI string
// Media is the object being uploaded.
Media io.Reader
// MediaType defines the media type, e.g. "image/jpeg".
MediaType string
// ContentLength is the full size of the object being uploaded.
ContentLength int64
// Return value
ret *drive.File
}
// Upload the io.Reader in of size bytes with contentType and info
func (f *FsDrive) Upload(in io.Reader, size int64, contentType string, info *drive.File, remote string) (*drive.File, error) {
fileId := info.Id
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(info)
if err != nil {
return nil, err
}
params := make(url.Values)
params.Set("alt", "json")
params.Set("uploadType", "resumable")
urls := "https://www.googleapis.com/upload/drive/v2/files"
method := "POST"
if fileId != "" {
params.Set("setModifiedDate", "true")
urls += "/{fileId}"
method = "PUT"
}
urls += "?" + params.Encode()
req, _ := http.NewRequest(method, urls, body)
googleapi.Expand(req.URL, map[string]string{
"fileId": fileId,
})
req.Header.Set("Content-Type", "application/json; charset=UTF-8")
req.Header.Set("X-Upload-Content-Type", contentType)
req.Header.Set("X-Upload-Content-Length", fmt.Sprintf("%v", size))
req.Header.Set("User-Agent", fs.UserAgent)
var res *http.Response
f.call(&err, func() {
res, err = f.client.Do(req)
if err == nil {
defer googleapi.CloseBody(res)
err = googleapi.CheckResponse(res)
}
})
if err != nil {
return nil, err
}
loc := res.Header.Get("Location")
rx := &resumableUpload{
f: f,
remote: remote,
URI: loc,
Media: in,
MediaType: contentType,
ContentLength: size,
}
return rx.Upload()
}
// Make an http.Request for the range passed in
func (rx *resumableUpload) makeRequest(start int64, body []byte) *http.Request {
reqSize := int64(len(body))
req, _ := http.NewRequest("POST", rx.URI, bytes.NewBuffer(body))
req.ContentLength = reqSize
if reqSize != 0 {
req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", start, start+reqSize-1, rx.ContentLength))
} else {
req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", rx.ContentLength))
}
req.Header.Set("Content-Type", rx.MediaType)
req.Header.Set("User-Agent", fs.UserAgent)
return req
}
// rangeRE matches the transfer status response from the server. $1 is
// the last byte index uploaded.
var rangeRE = regexp.MustCompile(`^0\-(\d+)$`)
// Query drive for the amount transferred so far
//
// If error is nil, then start should be valid
func (rx *resumableUpload) transferStatus() (start int64, err error) {
req := rx.makeRequest(0, nil)
res, err := rx.f.client.Do(req)
if err != nil {
return 0, err
}
defer googleapi.CloseBody(res)
if res.StatusCode == http.StatusCreated || res.StatusCode == http.StatusOK {
return rx.ContentLength, nil
}
if res.StatusCode != statusResumeIncomplete {
err = googleapi.CheckResponse(res)
if err != nil {
return 0, err
}
return 0, fmt.Errorf("unexpected http return code %v", res.StatusCode)
}
Range := res.Header.Get("Range")
if m := rangeRE.FindStringSubmatch(Range); len(m) == 2 {
start, err = strconv.ParseInt(m[1], 10, 64)
if err == nil {
return start, nil
}
}
return 0, fmt.Errorf("unable to parse range %q", Range)
}
// Transfer a chunk - caller must call googleapi.CloseBody(res) if err == nil || res != nil
func (rx *resumableUpload) transferChunk(start int64, body []byte) (int, error) {
req := rx.makeRequest(start, body)
res, err := rx.f.client.Do(req)
if err != nil {
return 599, err
}
defer googleapi.CloseBody(res)
if res.StatusCode == statusResumeIncomplete {
return res.StatusCode, nil
}
err = googleapi.CheckResponse(res)
if err != nil {
return res.StatusCode, err
}
// When the entire file upload is complete, the server
// responds with an HTTP 201 Created along with any metadata
// associated with this resource. If this request had been
// updating an existing entity rather than creating a new one,
// the HTTP response code for a completed upload would have
// been 200 OK.
//
// So parse the response out of the body. We aren't expecting
// any other 2xx codes, so we parse it unconditionaly on
// StatusCode
if err = json.NewDecoder(res.Body).Decode(&rx.ret); err != nil {
return 598, err
}
return res.StatusCode, nil
}
// Upload uploads the chunks from the input
// It retries each chunk maxTries times (with a pause of uploadPause between attempts).
func (rx *resumableUpload) Upload() (*drive.File, error) {
start := int64(0)
buf := make([]byte, chunkSize)
var StatusCode int
for start < rx.ContentLength {
reqSize := rx.ContentLength - start
if reqSize >= int64(chunkSize) {
reqSize = int64(chunkSize)
} else {
buf = buf[:reqSize]
}
// Read the chunk
_, err := io.ReadFull(rx.Media, buf)
if err != nil {
return nil, err
}
// Transfer the chunk
for try := 1; try <= maxTries; try++ {
fs.Debug(rx.remote, "Sending chunk %d length %d, %d/%d", start, reqSize, try, maxTries)
rx.f.beginCall()
StatusCode, err = rx.transferChunk(start, buf)
rx.f.endCall(err)
if StatusCode == statusResumeIncomplete || StatusCode == http.StatusCreated || StatusCode == http.StatusOK {
goto success
}
fs.Debug(rx.remote, "Retrying chunk %d/%d, code=%d, err=%v", try, maxTries, StatusCode, err)
}
fs.Debug(rx.remote, "Failed to send chunk")
return nil, fs.RetryErrorf("Chunk upload failed - retry: code=%d, err=%v", StatusCode, err)
success:
start += reqSize
}
// Resume or retry uploads that fail due to connection interruptions or
// any 5xx errors, including:
//
// 500 Internal Server Error
// 502 Bad Gateway
// 503 Service Unavailable
// 504 Gateway Timeout
//
// Use an exponential backoff strategy if any 5xx server error is
// returned when resuming or retrying upload requests. These errors can
// occur if a server is getting overloaded. Exponential backoff can help
// alleviate these kinds of problems during periods of high volume of
// requests or heavy network traffic. Other kinds of requests should not
// be handled by exponential backoff but you can still retry a number of
// them. When retrying these requests, limit the number of times you
// retry them. For example your code could limit to ten retries or less
// before reporting an error.
//
// Handle 404 Not Found errors when doing resumable uploads by starting
// the entire upload over from the beginning.
if rx.ret == nil {
return nil, fs.RetryErrorf("Incomplete upload - retry, last error %d", StatusCode)
}
return rx.ret, nil
}

View File

@@ -17,6 +17,20 @@ This is a JSON decode error - from Update / UploadByChunk
- Caused by 500 error from dropbox - Caused by 500 error from dropbox
- See https://github.com/stacktic/dropbox/issues/1 - See https://github.com/stacktic/dropbox/issues/1
- Possibly confusing dropbox with excess concurrency? - Possibly confusing dropbox with excess concurrency?
FIXME implement timeouts - need to get "github.com/stacktic/dropbox"
and hence "golang.org/x/oauth2" which uses DefaultTransport unless it
is set in the context passed into .Client()
func (db *Dropbox) client() *http.Client {
return db.config.Client(oauth2.NoContext, db.token)
}
// HTTPClient is the context key to use with golang.org/x/net/context's
// WithValue function to associate an *http.Client value with a context.
var HTTPClient ContextKey
So pass in a context with HTTPClient set...
*/ */
import ( import (

View File

@@ -6,6 +6,8 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"log" "log"
"math"
"net/http"
"os" "os"
"os/user" "os/user"
"path" "path"
@@ -15,6 +17,7 @@ import (
"time" "time"
"github.com/Unknwon/goconfig" "github.com/Unknwon/goconfig"
"github.com/mreiferson/go-httpclient"
"github.com/ogier/pflag" "github.com/ogier/pflag"
) )
@@ -35,32 +38,43 @@ var (
// Global config // Global config
Config = &ConfigInfo{} Config = &ConfigInfo{}
// Flags // Flags
verbose = pflag.BoolP("verbose", "v", false, "Print lots more stuff") verbose = pflag.BoolP("verbose", "v", false, "Print lots more stuff")
quiet = pflag.BoolP("quiet", "q", false, "Print as little stuff as possible") quiet = pflag.BoolP("quiet", "q", false, "Print as little stuff as possible")
modifyWindow = pflag.DurationP("modify-window", "", time.Nanosecond, "Max time diff to be considered the same") modifyWindow = pflag.DurationP("modify-window", "", time.Nanosecond, "Max time diff to be considered the same")
checkers = pflag.IntP("checkers", "", 8, "Number of checkers to run in parallel.") checkers = pflag.IntP("checkers", "", 8, "Number of checkers to run in parallel.")
transfers = pflag.IntP("transfers", "", 4, "Number of file transfers to run in parallel.") transfers = pflag.IntP("transfers", "", 4, "Number of file transfers to run in parallel.")
configFile = pflag.StringP("config", "", ConfigPath, "Config file.") configFile = pflag.StringP("config", "", ConfigPath, "Config file.")
dryRun = pflag.BoolP("dry-run", "n", false, "Do a trial run with no permanent changes") dryRun = pflag.BoolP("dry-run", "n", false, "Do a trial run with no permanent changes")
bwLimit SizeSuffix connectTimeout = pflag.DurationP("contimeout", "", 60*time.Second, "Connect timeout")
timeout = pflag.DurationP("timeout", "", 5*60*time.Second, "IO idle timeout")
bwLimit SizeSuffix
) )
func init() { func init() {
pflag.VarP(&bwLimit, "bwlimit", "", "Bandwidth limit in kBytes/s, or use suffix K|M|G") pflag.VarP(&bwLimit, "bwlimit", "", "Bandwidth limit in kBytes/s, or use suffix k|M|G")
} }
// Turn SizeSuffix into a string // Turn SizeSuffix into a string
func (x *SizeSuffix) String() string { func (x SizeSuffix) String() string {
scaled := float64(0)
suffix := ""
switch { switch {
case *x == 0: case x == 0:
return "0" return "0"
case *x < 1024*1024: case x < 1024*1024:
return fmt.Sprintf("%.3fk", float64(*x)/1024) scaled = float64(x) / 1024
case *x < 1024*1024*1024: suffix = "k"
return fmt.Sprintf("%.3fM", float64(*x)/1024/1024) case x < 1024*1024*1024:
scaled = float64(x) / 1024 / 1024
suffix = "M"
default: default:
return fmt.Sprintf("%.3fG", float64(*x)/1024/1024/1024) scaled = float64(x) / 1024 / 1024 / 1024
suffix = "G"
} }
if math.Floor(scaled) == scaled {
return fmt.Sprintf("%.0f%s", scaled, suffix)
}
return fmt.Sprintf("%.3f%s", scaled, suffix)
} }
// Set a SizeSuffix // Set a SizeSuffix
@@ -89,6 +103,9 @@ func (x *SizeSuffix) Set(s string) error {
if err != nil { if err != nil {
return err return err
} }
if value < 0 {
return fmt.Errorf("Size can't be negative %q", s)
}
value *= multiplier value *= multiplier
*x = SizeSuffix(value) *x = SizeSuffix(value)
return nil return nil
@@ -99,12 +116,48 @@ var _ pflag.Value = (*SizeSuffix)(nil)
// Filesystem config options // Filesystem config options
type ConfigInfo struct { type ConfigInfo struct {
Verbose bool Verbose bool
Quiet bool Quiet bool
DryRun bool DryRun bool
ModifyWindow time.Duration ModifyWindow time.Duration
Checkers int Checkers int
Transfers int Transfers int
ConnectTimeout time.Duration // Connect timeout
Timeout time.Duration // Data channel timeout
}
// Transport returns an http.RoundTripper with the correct timeouts
func (ci *ConfigInfo) Transport() http.RoundTripper {
return &httpclient.Transport{
Proxy: http.ProxyFromEnvironment,
MaxIdleConnsPerHost: ci.Checkers + ci.Transfers + 1,
// ConnectTimeout, if non-zero, is the maximum amount of time a dial will wait for
// a connect to complete.
ConnectTimeout: ci.ConnectTimeout,
// ResponseHeaderTimeout, if non-zero, specifies the amount of
// time to wait for a server's response headers after fully
// writing the request (including its body, if any). This
// time does not include the time to read the response body.
ResponseHeaderTimeout: ci.Timeout,
// RequestTimeout, if non-zero, specifies the amount of time for the entire
// request to complete (including all of the above timeouts + entire response body).
// This should never be less than the sum total of the above two timeouts.
//RequestTimeout: NOT SET,
// ReadWriteTimeout, if non-zero, will set a deadline for every Read and
// Write operation on the request connection.
ReadWriteTimeout: ci.Timeout,
}
}
// Transport returns an http.Client with the correct timeouts
func (ci *ConfigInfo) Client() *http.Client {
return &http.Client{
Transport: ci.Transport(),
}
} }
// Find the config directory // Find the config directory
@@ -139,6 +192,8 @@ func LoadConfig() {
Config.Checkers = *checkers Config.Checkers = *checkers
Config.Transfers = *transfers Config.Transfers = *transfers
Config.DryRun = *dryRun Config.DryRun = *dryRun
Config.Timeout = *timeout
Config.ConnectTimeout = *connectTimeout
ConfigPath = *configFile ConfigPath = *configFile

View File

@@ -9,10 +9,11 @@ func TestSizeSuffixString(t *testing.T) {
}{ }{
{0, "0"}, {0, "0"},
{102, "0.100k"}, {102, "0.100k"},
{1024, "1.000k"}, {1024, "1k"},
{1024 * 1024, "1.000M"}, {1024 * 1024, "1M"},
{1024 * 1024 * 1024, "1.000G"}, {1024 * 1024 * 1024, "1G"},
{10 * 1024 * 1024 * 1024, "10.000G"}, {10 * 1024 * 1024 * 1024, "10G"},
{10.1 * 1024 * 1024 * 1024, "10.100G"},
} { } {
ss := SizeSuffix(test.in) ss := SizeSuffix(test.in)
got := ss.String() got := ss.String()
@@ -41,6 +42,7 @@ func TestSizeSuffixSet(t *testing.T) {
{"1p", 0, true}, {"1p", 0, true},
{"1.p", 0, true}, {"1.p", 0, true},
{"1p", 0, true}, {"1p", 0, true},
{"-1K", 0, true},
} { } {
ss := SizeSuffix(0) ss := SizeSuffix(0)
err := ss.Set(test.in) err := ss.Set(test.in)

View File

@@ -244,8 +244,8 @@ func NewFs(path string) (Fs, error) {
// Outputs log for object // Outputs log for object
func OutputLog(o interface{}, text string, args ...interface{}) { func OutputLog(o interface{}, text string, args ...interface{}) {
description := "" description := ""
if x, ok := o.(fmt.Stringer); ok { if o != nil {
description = x.String() + ": " description = fmt.Sprintf("%v: ", o)
} }
out := fmt.Sprintf(text, args...) out := fmt.Sprintf(text, args...)
log.Print(description + out) log.Print(description + out)

View File

@@ -109,14 +109,19 @@ func MimeType(o Object) string {
} }
// Used to remove a failed copy // Used to remove a failed copy
func removeFailedCopy(dst Object) { //
if dst != nil { // Returns whether the file was succesfully removed or not
Debug(dst, "Removing failed copy") func removeFailedCopy(dst Object) bool {
removeErr := dst.Remove() if dst == nil {
if removeErr != nil { return false
Debug(dst, "Failed to remove failed copy: %s", removeErr)
}
} }
Debug(dst, "Removing failed copy")
removeErr := dst.Remove()
if removeErr != nil {
Debug(dst, "Failed to remove failed copy: %s", removeErr)
return false
}
return true
} }
// Copy src object to dst or f if nil // Copy src object to dst or f if nil
@@ -150,7 +155,11 @@ tryAgain:
if r, ok := err.(Retry); ok && r.Retry() && tries < maxTries { if r, ok := err.(Retry); ok && r.Retry() && tries < maxTries {
tries++ tries++
Log(src, "Received error: %v - retrying %d/%d", err, tries, maxTries) Log(src, "Received error: %v - retrying %d/%d", err, tries, maxTries)
removeFailedCopy(dst) if removeFailedCopy(dst) {
// If we removed dst, then nil it out and note we are not updating
dst = nil
doUpdate = false
}
goto tryAgain goto tryAgain
} }
if err == nil { if err == nil {
@@ -270,6 +279,20 @@ func DeleteFiles(to_be_deleted ObjectsChan) {
wg.Wait() wg.Wait()
} }
// Read a map of Object.Remote to Object for the given Fs
func readFilesMap(fs Fs) map[string]Object {
files := make(map[string]Object)
for o := range fs.List() {
remote := o.Remote()
if _, ok := files[remote]; !ok {
files[remote] = o
} else {
Log(o, "Duplicate file detected")
}
}
return files
}
// Syncs fsrc into fdst // Syncs fsrc into fdst
// //
// If Delete is true then it deletes any files in fdst that aren't in fsrc // If Delete is true then it deletes any files in fdst that aren't in fsrc
@@ -284,10 +307,7 @@ func Sync(fdst, fsrc Fs, Delete bool) error {
// Read the destination files first // Read the destination files first
// FIXME could do this in parallel and make it use less memory // FIXME could do this in parallel and make it use less memory
delFiles := make(map[string]Object) delFiles := readFilesMap(fdst)
for dst := range fdst.List() {
delFiles[dst.Remote()] = dst
}
// Read source files checking them off against dest files // Read source files checking them off against dest files
to_be_checked := make(ObjectPairChan, Config.Transfers) to_be_checked := make(ObjectPairChan, Config.Transfers)
@@ -352,22 +372,20 @@ func Check(fdst, fsrc Fs) error {
// Read the destination files first // Read the destination files first
// FIXME could do this in parallel and make it use less memory // FIXME could do this in parallel and make it use less memory
dstFiles := make(map[string]Object) dstFiles := readFilesMap(fdst)
for dst := range fdst.List() {
dstFiles[dst.Remote()] = dst
}
// Read the source files checking them against dstFiles // Read the source files checking them against dstFiles
// FIXME could do this in parallel and make it use less memory // FIXME could do this in parallel and make it use less memory
srcFiles := make(map[string]Object) srcFiles := readFilesMap(fsrc)
// Move all the common files into commonFiles and delete then
// from srcFiles and dstFiles
commonFiles := make(map[string][]Object) commonFiles := make(map[string][]Object)
for src := range fsrc.List() { for remote, src := range srcFiles {
remote := src.Remote()
if dst, ok := dstFiles[remote]; ok { if dst, ok := dstFiles[remote]; ok {
commonFiles[remote] = []Object{dst, src} commonFiles[remote] = []Object{dst, src}
delete(srcFiles, remote)
delete(dstFiles, remote) delete(dstFiles, remote)
} else {
srcFiles[remote] = src
} }
} }

View File

@@ -1,39 +0,0 @@
package fs
import (
"io"
"os"
)
// SeekWrapper wraps an io.Reader with a basic Seek method which
// returns the Size attribute.
//
// This is used for google.golang.org/api/googleapi/googleapi.go
// to detect the length (see getReaderSize function)
//
// Without this the getReaderSize function reads the entire file into
// memory to find its length.
type SeekWrapper struct {
In io.Reader
Size int64
}
// Read bytes from the object - see io.Reader
func (file *SeekWrapper) Read(p []byte) (n int, err error) {
return file.In.Read(p)
}
// Seek - minimal implementation for Google API length detection
func (file *SeekWrapper) Seek(offset int64, whence int) (int64, error) {
switch whence {
case os.SEEK_CUR:
return 0, nil
case os.SEEK_END:
return file.Size, nil
}
return 0, nil
}
// Interfaces that SeekWrapper implements
var _ io.Reader = (*SeekWrapper)(nil)
var _ io.Seeker = (*SeekWrapper)(nil)

View File

@@ -1,3 +1,3 @@
package fs package fs
const Version = "v1.11" const Version = "v1.13"

View File

@@ -5,7 +5,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"net/http"
"code.google.com/p/goauth2/oauth" "code.google.com/p/goauth2/oauth"
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
@@ -82,7 +81,7 @@ func (auth *Auth) newTransport(name string) (*oauth.Transport, error) {
t := &oauth.Transport{ t := &oauth.Transport{
Config: config, Config: config,
Transport: http.DefaultTransport, Transport: fs.Config.Transport(),
} }
return t, nil return t, nil

View File

@@ -359,7 +359,6 @@ func (f *FsStorage) ListDir() fs.DirChan {
func (f *FsStorage) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.Object, error) { func (f *FsStorage) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.Object, error) {
// Temporary FsObject under construction // Temporary FsObject under construction
o := &FsObjectStorage{storage: f, remote: remote} o := &FsObjectStorage{storage: f, remote: remote}
in = &fs.SeekWrapper{In: in, Size: size}
return o, o.Update(in, modTime, size) return o, o.Update(in, modTime, size)
} }
@@ -556,7 +555,6 @@ func (o *FsObjectStorage) Update(in io.Reader, modTime time.Time, size int64) er
Updated: modTime.Format(timeFormatOut), // Doesn't get set Updated: modTime.Format(timeFormatOut), // Doesn't get set
Metadata: metadataFromModTime(modTime), Metadata: metadataFromModTime(modTime),
} }
in = &fs.SeekWrapper{In: in, Size: size}
newObject, err := o.storage.svc.Objects.Insert(o.storage.bucket, &object).Media(in).Name(object.Name).PredefinedAcl(o.storage.objectAcl).Do() newObject, err := o.storage.svc.Objects.Insert(o.storage.bucket, &object).Media(in).Name(object.Name).PredefinedAcl(o.storage.objectAcl).Do()
if err != nil { if err != nil {
return err return err

View File

@@ -62,8 +62,8 @@ var Commands = []Command{
ArgsHelp: "source:path dest:path", ArgsHelp: "source:path dest:path",
Help: ` Help: `
Copy the source to the destination. Doesn't transfer Copy the source to the destination. Doesn't transfer
unchanged files, testing first by modification time then by unchanged files, testing by size and modification time or
size. Doesn't delete files from the destination.`, MD5SUM. Doesn't delete files from the destination.`,
Run: func(fdst, fsrc fs.Fs) { Run: func(fdst, fsrc fs.Fs) {
err := fs.Sync(fdst, fsrc, false) err := fs.Sync(fdst, fsrc, false)
if err != nil { if err != nil {
@@ -77,11 +77,11 @@ var Commands = []Command{
Name: "sync", Name: "sync",
ArgsHelp: "source:path dest:path", ArgsHelp: "source:path dest:path",
Help: ` Help: `
Sync the source to the destination. Doesn't transfer Sync the source to the destination, changing the destination
unchanged files, testing first by modification time then by only. Doesn't transfer unchanged files, testing by size and
size. Deletes any files that exist in source that don't modification time or MD5SUM. Destination is updated to match
exist in destination. Since this can cause data loss, test source, including deleting files if necessary. Since this can
first with the --dry-run flag.`, cause data loss, test first with the --dry-run flag.`,
Run: func(fdst, fsrc fs.Fs) { Run: func(fdst, fsrc fs.Fs) {
err := fs.Sync(fdst, fsrc, true) err := fs.Sync(fdst, fsrc, true)
if err != nil { if err != nil {
@@ -123,7 +123,8 @@ var Commands = []Command{
Name: "lsl", Name: "lsl",
ArgsHelp: "[remote:path]", ArgsHelp: "[remote:path]",
Help: ` Help: `
List all the objects in the the path with modification time, size and path.`, List all the objects in the the path with modification time,
size and path.`,
Run: func(fdst, fsrc fs.Fs) { Run: func(fdst, fsrc fs.Fs) {
err := fs.ListLong(fdst, os.Stdout) err := fs.ListLong(fdst, os.Stdout)
if err != nil { if err != nil {
@@ -242,7 +243,8 @@ Subcommands:
fmt.Fprintf(os.Stderr, "Options:\n") fmt.Fprintf(os.Stderr, "Options:\n")
pflag.PrintDefaults() pflag.PrintDefaults()
fmt.Fprintf(os.Stderr, ` fmt.Fprintf(os.Stderr, `
It is only necessary to use a unique prefix of the subcommand, eg 'up' for 'upload'. It is only necessary to use a unique prefix of the subcommand, eg 'up'
for 'upload'.
`) `)
} }

View File

@@ -184,6 +184,7 @@ func s3Connection(name string) (*s3.S3, error) {
} }
c := s3.New(auth, region) c := s3.New(auth, region)
c.Client = fs.Config.Client()
return c, nil return c, nil
} }
@@ -419,9 +420,17 @@ func (o *FsObjectS3) Remote() string {
return o.remote return o.remote
} }
var matchMd5 = regexp.MustCompile(`^[0-9a-f]{32}$`)
// Md5sum returns the Md5sum of an object returning a lowercase hex string // Md5sum returns the Md5sum of an object returning a lowercase hex string
func (o *FsObjectS3) Md5sum() (string, error) { func (o *FsObjectS3) Md5sum() (string, error) {
return strings.Trim(strings.ToLower(o.etag), `"`), nil etag := strings.Trim(strings.ToLower(o.etag), `"`)
// Check the etag is a valid md5sum
if !matchMd5.MatchString(etag) {
fs.Debug(o, "Invalid md5sum (probably multipart uploaded) - ignoring: %q", etag)
return "", nil
}
return etag, nil
} }
// Size returns the size of an object in bytes // Size returns the size of an object in bytes

View File

@@ -113,12 +113,15 @@ func swiftConnection(name string) (*swift.Connection, error) {
return nil, errors.New("auth not found") return nil, errors.New("auth not found")
} }
c := &swift.Connection{ c := &swift.Connection{
UserName: userName, UserName: userName,
ApiKey: apiKey, ApiKey: apiKey,
AuthUrl: authUrl, AuthUrl: authUrl,
UserAgent: fs.UserAgent, UserAgent: fs.UserAgent,
Tenant: fs.ConfigFile.MustValue(name, "tenant"), Tenant: fs.ConfigFile.MustValue(name, "tenant"),
Region: fs.ConfigFile.MustValue(name, "region"), Region: fs.ConfigFile.MustValue(name, "region"),
ConnectTimeout: 10 * fs.Config.ConnectTimeout, // Use the timeouts in the transport
Timeout: 10 * fs.Config.Timeout, // Use the timeouts in the transport
Transport: fs.Config.Transport(),
} }
err := c.Authenticate() err := c.Authenticate()
if err != nil { if err != nil {