mirror of
https://github.com/rclone/rclone.git
synced 2025-12-15 15:53:41 +00:00
Compare commits
29 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
88293a4b8a | ||
|
|
981104519e | ||
|
|
1d254a3674 | ||
|
|
f88d171afd | ||
|
|
ba2091725e | ||
|
|
7c120b8bc5 | ||
|
|
5cc5429f99 | ||
|
|
09d71239b6 | ||
|
|
c643e4585e | ||
|
|
873db29391 | ||
|
|
81a933ae38 | ||
|
|
ecb3c7bcef | ||
|
|
80000b904c | ||
|
|
c47c9cd440 | ||
|
|
b4a0941d4c | ||
|
|
c03d6a1ec3 | ||
|
|
46d39ebaf7 | ||
|
|
fe68737268 | ||
|
|
2360bf907a | ||
|
|
aa093e991e | ||
|
|
a5974999eb | ||
|
|
24a6ff54c2 | ||
|
|
e89ea3360e | ||
|
|
85f8552c4d | ||
|
|
a287e3ced7 | ||
|
|
8e4d8d13b8 | ||
|
|
cf208ad21b | ||
|
|
0faed16899 | ||
|
|
8d1c0ad07c |
88
README.md
88
README.md
@@ -75,28 +75,34 @@ Subcommands
|
||||
rclone copy source:path dest:path
|
||||
|
||||
Copy the source to the destination. Doesn't transfer
|
||||
unchanged files, testing first by modification time then by
|
||||
unchanged files, testing by size and modification time or
|
||||
MD5SUM. Doesn't delete files from the destination.
|
||||
|
||||
rclone sync source:path dest:path
|
||||
|
||||
Sync the source to the destination. Doesn't transfer
|
||||
unchanged files, testing first by modification time then by
|
||||
MD5SUM. Deletes any files that exist in source that don't
|
||||
exist in destination. Since this can cause data loss, test
|
||||
first with the `--dry-run` flag.
|
||||
Sync the source to the destination, changing the destination
|
||||
only. Doesn't transfer unchanged files, testing by size and
|
||||
modification time or MD5SUM. Destination is updated to match
|
||||
source, including deleting files if necessary. Since this can
|
||||
cause data loss, test first with the `--dry-run` flag.
|
||||
|
||||
rclone ls [remote:path]
|
||||
|
||||
List all the objects in the the path with sizes.
|
||||
|
||||
rclone lsl [remote:path]
|
||||
|
||||
List all the objects in the the path with sizes and timestamps.
|
||||
List all the objects in the the path with size and path.
|
||||
|
||||
rclone lsd [remote:path]
|
||||
|
||||
List all directories/objects/buckets in the the path.
|
||||
List all directories/containers/buckets in the the path.
|
||||
|
||||
rclone lsl [remote:path]
|
||||
|
||||
List all the objects in the the path with modification time,
|
||||
size and path.
|
||||
|
||||
rclone md5sum [remote:path]
|
||||
|
||||
Produces an md5sum file for all the objects in the path. This
|
||||
is in the same format as the standard md5sum tool produces.
|
||||
|
||||
rclone mkdir remote:path
|
||||
|
||||
@@ -117,22 +123,30 @@ Checks the files in the source and destination match. It
|
||||
compares sizes and MD5SUMs and prints a report of files which
|
||||
don't match. It doesn't alter the source or destination.
|
||||
|
||||
rclone md5sum remote:path
|
||||
rclone config
|
||||
|
||||
Produces an md5sum file for all the objects in the path. This is in
|
||||
the same format as the standard md5sum tool produces.
|
||||
Enter an interactive configuration session.
|
||||
|
||||
rclone help
|
||||
|
||||
This help.
|
||||
|
||||
General options:
|
||||
|
||||
```
|
||||
--bwlimit=0: Bandwidth limit in kBytes/s, or use suffix k|M|G
|
||||
--checkers=8: Number of checkers to run in parallel.
|
||||
--config="~/.rclone.conf": Config file.
|
||||
--contimeout=1m0s: Connect timeout
|
||||
-n, --dry-run=false: Do a trial run with no permanent changes
|
||||
--log-file="": Log everything to this file
|
||||
--modify-window=1ns: Max time diff to be considered the same
|
||||
-q, --quiet=false: Print as little stuff as possible
|
||||
--stats=1m0s: Interval to print stats
|
||||
--stats=1m0s: Interval to print stats (0 to disable)
|
||||
--timeout=5m0s: IO idle timeout
|
||||
--transfers=4: Number of file transfers to run in parallel.
|
||||
-v, --verbose=false: Print lots more stuff
|
||||
-V, --version=false: Print the version number
|
||||
```
|
||||
|
||||
Developer options:
|
||||
@@ -161,12 +175,7 @@ So to copy a local directory to a swift container called backup:
|
||||
|
||||
rclone sync /home/source swift:backup
|
||||
|
||||
The modified time is stored as metadata on the object as
|
||||
`X-Object-Meta-Mtime` as floating point since the epoch.
|
||||
|
||||
This is a defacto standard (used in the official python-swiftclient
|
||||
amongst others) for storing the modification time (as read using
|
||||
os.Stat) for an object.
|
||||
For more help see the [online docs on Openstack Swift](http://rclone.org/swift).
|
||||
|
||||
Amazon S3
|
||||
---------
|
||||
@@ -178,8 +187,7 @@ So to copy a local directory to a s3 container called backup
|
||||
|
||||
rclone sync /home/source s3:backup
|
||||
|
||||
The modified time is stored as metadata on the object as
|
||||
`X-Amz-Meta-Mtime` as floating point since the epoch.
|
||||
For more help see the [online docs on Amazon S3](http://rclone.org/s3).
|
||||
|
||||
Google drive
|
||||
------------
|
||||
@@ -194,7 +202,7 @@ To copy a local directory to a drive directory called backup
|
||||
|
||||
rclone copy /home/source remote:backup
|
||||
|
||||
Google drive stores modification times accurate to 1 ms natively.
|
||||
For more help see the [online docs on Google Drive](http://rclone.org/drive).
|
||||
|
||||
Dropbox
|
||||
-------
|
||||
@@ -209,10 +217,7 @@ To copy a local directory to a drive directory called backup
|
||||
|
||||
rclone copy /home/source dropbox:backup
|
||||
|
||||
Md5sums and timestamps in RFC3339 format accurate to 1ns are stored in
|
||||
a Dropbox datastore called "rclone". Dropbox datastores are limited
|
||||
to 100,000 rows so this is the maximum number of files rclone can
|
||||
manage on Dropbox.
|
||||
For more help see the [online docs on Dropbox](http://rclone.org/dropbox).
|
||||
|
||||
Google Cloud Storage
|
||||
--------------------
|
||||
@@ -228,9 +233,7 @@ To copy a local directory to a google cloud storage directory called backup
|
||||
|
||||
rclone copy /home/source remote:backup
|
||||
|
||||
Google google cloud storage stores md5sums natively and rclone stores
|
||||
modification times as metadata on the object, under the "mtime" key in
|
||||
RFC3339 format accurate to 1ns.
|
||||
For more help see the [online docs on Google Cloud Storage](http://rclone.org/googlecloudstorage/).
|
||||
|
||||
Single file copies
|
||||
------------------
|
||||
@@ -259,6 +262,27 @@ Bugs
|
||||
|
||||
Changelog
|
||||
---------
|
||||
* v1.13 - 2015-05-10
|
||||
* Revise documentation (especially sync)
|
||||
* Implement --timeout and --conntimeout
|
||||
* s3: ignore etags from multipart uploads which aren't md5sums
|
||||
* v1.12 - 2015-03-15
|
||||
* drive: Use chunked upload for files above a certain size
|
||||
* drive: add --drive-chunk-size and --drive-upload-cutoff parameters
|
||||
* drive: switch to insert from update when a failed copy deletes the upload
|
||||
* core: Log duplicate files if they are detected
|
||||
* v1.11 - 2015-03-04
|
||||
* swift: add region parameter
|
||||
* drive: fix crash on failed to update remote mtime
|
||||
* In remote paths, change native directory separators to /
|
||||
* Add synchronization to ls/lsl/lsd output to stop corruptions
|
||||
* Ensure all stats/log messages to go stderr
|
||||
* Add --log-file flag to log everything (including panics) to file
|
||||
* Make it possible to disable stats printing with --stats=0
|
||||
* Implement --bwlimit to limit data transfer bandwidth
|
||||
* v1.10 - 2015-02-12
|
||||
* s3: list an unlimited number of items
|
||||
* Fix getting stuck in the configurator
|
||||
* v1.09 - 2015-02-07
|
||||
* windows: Stop drive letters (eg C:) getting mixed up with remotes (eg drive:)
|
||||
* local: Fix directory separators on Windows
|
||||
|
||||
@@ -58,28 +58,34 @@ Subcommands
|
||||
rclone copy source:path dest:path
|
||||
|
||||
Copy the source to the destination. Doesn't transfer
|
||||
unchanged files, testing first by modification time then by
|
||||
unchanged files, testing by size and modification time or
|
||||
MD5SUM. Doesn't delete files from the destination.
|
||||
|
||||
rclone sync source:path dest:path
|
||||
|
||||
Sync the source to the destination. Doesn't transfer
|
||||
unchanged files, testing first by modification time then by
|
||||
MD5SUM. Deletes any files that exist in source that don't
|
||||
exist in destination. Since this can cause data loss, test
|
||||
first with the -dry-run flag.
|
||||
Sync the source to the destination, changing the destination
|
||||
only. Doesn't transfer unchanged files, testing by size and
|
||||
modification time or MD5SUM. Destination is updated to match
|
||||
source, including deleting files if necessary. Since this can
|
||||
cause data loss, test first with the `--dry-run` flag.
|
||||
|
||||
rclone ls [remote:path]
|
||||
|
||||
List all the objects in the the path with sizes.
|
||||
|
||||
rclone lsl [remote:path]
|
||||
|
||||
List all the objects in the the path with sizes and timestamps.
|
||||
List all the objects in the the path with size and path.
|
||||
|
||||
rclone lsd [remote:path]
|
||||
|
||||
List all directories/objects/buckets in the the path.
|
||||
List all directories/containers/buckets in the the path.
|
||||
|
||||
rclone lsl [remote:path]
|
||||
|
||||
List all the objects in the the path with modification time,
|
||||
size and path.
|
||||
|
||||
rclone md5sum [remote:path]
|
||||
|
||||
Produces an md5sum file for all the objects in the path. This
|
||||
is in the same format as the standard md5sum tool produces.
|
||||
|
||||
rclone mkdir remote:path
|
||||
|
||||
@@ -100,21 +106,28 @@ Checks the files in the source and destination match. It
|
||||
compares sizes and MD5SUMs and prints a report of files which
|
||||
don't match. It doesn't alter the source or destination.
|
||||
|
||||
rclone md5sum remote:path
|
||||
rclone config
|
||||
|
||||
Produces an md5sum file for all the objects in the path. This is in
|
||||
the same format as the standard md5sum tool produces.
|
||||
General options:
|
||||
Enter an interactive configuration session.
|
||||
|
||||
rclone help
|
||||
|
||||
This help.
|
||||
|
||||
```
|
||||
--bwlimit=0: Bandwidth limit in kBytes/s, or use suffix k|M|G
|
||||
--checkers=8: Number of checkers to run in parallel.
|
||||
--transfers=4: Number of file transfers to run in parallel.
|
||||
--config="~/.rclone.conf": Config file.
|
||||
--contimeout=1m0s: Connect timeout
|
||||
-n, --dry-run=false: Do a trial run with no permanent changes
|
||||
--log-file="": Log everything to this file
|
||||
--modify-window=1ns: Max time diff to be considered the same
|
||||
-q, --quiet=false: Print as little stuff as possible
|
||||
--stats=1m0s: Interval to print stats
|
||||
--stats=1m0s: Interval to print stats (0 to disable)
|
||||
--timeout=5m0s: IO idle timeout
|
||||
--transfers=4: Number of file transfers to run in parallel.
|
||||
-v, --verbose=false: Print lots more stuff
|
||||
-V, --version=false: Print the version number
|
||||
```
|
||||
|
||||
Developer options:
|
||||
|
||||
@@ -2,34 +2,34 @@
|
||||
title: "Rclone downloads"
|
||||
description: "Download rclone binaries for your OS."
|
||||
type: page
|
||||
date: "2015-02-07"
|
||||
date: "2015-05-10"
|
||||
---
|
||||
|
||||
Rclone Download v1.09
|
||||
Rclone Download v1.13
|
||||
=====================
|
||||
|
||||
* Windows
|
||||
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.09-windows-386.zip)
|
||||
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.09-windows-amd64.zip)
|
||||
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-windows-386.zip)
|
||||
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-windows-amd64.zip)
|
||||
* OSX
|
||||
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.09-osx-386.zip)
|
||||
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.09-osx-amd64.zip)
|
||||
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-osx-386.zip)
|
||||
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-osx-amd64.zip)
|
||||
* Linux
|
||||
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.09-linux-386.zip)
|
||||
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.09-linux-amd64.zip)
|
||||
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.09-linux-arm.zip)
|
||||
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-linux-386.zip)
|
||||
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-linux-amd64.zip)
|
||||
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.13-linux-arm.zip)
|
||||
* FreeBSD
|
||||
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.09-freebsd-386.zip)
|
||||
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.09-freebsd-amd64.zip)
|
||||
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.09-freebsd-arm.zip)
|
||||
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-freebsd-386.zip)
|
||||
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-freebsd-amd64.zip)
|
||||
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.13-freebsd-arm.zip)
|
||||
* NetBSD
|
||||
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.09-netbsd-386.zip)
|
||||
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.09-netbsd-amd64.zip)
|
||||
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.09-netbsd-arm.zip)
|
||||
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-netbsd-386.zip)
|
||||
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-netbsd-amd64.zip)
|
||||
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.13-netbsd-arm.zip)
|
||||
* OpenBSD
|
||||
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.09-openbsd-386.zip)
|
||||
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.09-openbsd-amd64.zip)
|
||||
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-openbsd-386.zip)
|
||||
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-openbsd-amd64.zip)
|
||||
* Plan 9
|
||||
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.09-plan9-386.zip)
|
||||
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-plan9-386.zip)
|
||||
|
||||
Older downloads can be found [here](http://downloads.rclone.org/)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
---
|
||||
title: "Google drive"
|
||||
description: "Rclone docs for Google drive"
|
||||
date: "2014-04-26"
|
||||
date: "2015-05-10"
|
||||
---
|
||||
|
||||
<i class="fa fa-google"></i> Google Drive
|
||||
@@ -73,3 +73,16 @@ Modified time
|
||||
-------------
|
||||
|
||||
Google drive stores modification times accurate to 1 ms.
|
||||
|
||||
Revisions
|
||||
---------
|
||||
|
||||
Google drive stores revisions of files. When you upload a change to
|
||||
an existing file to google drive using rclone it will create a new
|
||||
revision of that file.
|
||||
|
||||
Revisions follow the standard google policy which at time of writing
|
||||
was
|
||||
|
||||
* They are deleted after 30 days or 100 revisions (whatever comes first).
|
||||
* They do not count towards a user storage quota.
|
||||
|
||||
181
drive/drive.go
181
drive/drive.go
@@ -10,10 +10,7 @@ package drive
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"mime"
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -42,6 +39,10 @@ const (
|
||||
var (
|
||||
// Flags
|
||||
driveFullList = pflag.BoolP("drive-full-list", "", true, "Use a full listing for directory list. More data but usually quicker.")
|
||||
// chunkSize is the size of the chunks created during a resumable upload and should be a power of two.
|
||||
// 1<<18 is the minimum size supported by the Google uploader, and there is no maximum.
|
||||
chunkSize = fs.SizeSuffix(256 * 1024)
|
||||
driveUploadCutoff = chunkSize
|
||||
// Description of how to auth for this app
|
||||
driveAuth = &googleauth.Auth{
|
||||
Scope: "https://www.googleapis.com/auth/drive",
|
||||
@@ -66,6 +67,8 @@ func init() {
|
||||
Help: "Google Application Client Secret - leave blank to use rclone's.",
|
||||
}},
|
||||
})
|
||||
pflag.VarP(&driveUploadCutoff, "drive-upload-cutoff", "", "Cutoff for switching to chunked upload")
|
||||
pflag.VarP(&chunkSize, "drive-chunk-size", "", "Upload chunk size. Must a power of 2 >= 256k.")
|
||||
}
|
||||
|
||||
// FsDrive represents a remote drive server
|
||||
@@ -77,7 +80,7 @@ type FsDrive struct {
|
||||
rootId string // Id of the root directory
|
||||
foundRoot bool // Whether we have found the root or not
|
||||
findRootLock sync.Mutex // Protect findRoot from concurrent use
|
||||
dirCache dirCache // Map of directory path to directory id
|
||||
dirCache *dirCache // Map of directory path to directory id
|
||||
findDirLock sync.Mutex // Protect findDir from concurrent use
|
||||
pacer chan struct{} // To pace the operations
|
||||
sleepTime time.Duration // Time to sleep for each transaction
|
||||
@@ -102,8 +105,8 @@ type dirCache struct {
|
||||
}
|
||||
|
||||
// Make a new locked map
|
||||
func newDirCache() dirCache {
|
||||
d := dirCache{}
|
||||
func newDirCache() *dirCache {
|
||||
d := &dirCache{}
|
||||
d.Flush()
|
||||
return d
|
||||
}
|
||||
@@ -147,8 +150,12 @@ func (f *FsDrive) String() string {
|
||||
return fmt.Sprintf("Google drive root '%s'", f.root)
|
||||
}
|
||||
|
||||
// Wait for the pace
|
||||
func (f *FsDrive) paceWait() {
|
||||
// Start a call to the drive API
|
||||
//
|
||||
// This must be called as a pair with endCall
|
||||
//
|
||||
// This waits for the pacer token
|
||||
func (f *FsDrive) beginCall() {
|
||||
// pacer starts with a token in and whenever we take one out
|
||||
// XXX ms later we put another in. We could do this with a
|
||||
// Ticker more accurately, but then we'd have to work out how
|
||||
@@ -163,12 +170,14 @@ func (f *FsDrive) paceWait() {
|
||||
}(f.sleepTime)
|
||||
}
|
||||
|
||||
// End a call to the drive API
|
||||
//
|
||||
// Refresh the pace given an error that was returned. It returns a
|
||||
// boolean as to whether the operation should be retried.
|
||||
//
|
||||
// See https://developers.google.com/drive/web/handle-errors
|
||||
// http://stackoverflow.com/questions/18529524/403-rate-limit-after-only-1-insert-per-second
|
||||
func (f *FsDrive) paceRefresh(err error) bool {
|
||||
func (f *FsDrive) endCall(err error) bool {
|
||||
again := false
|
||||
oldSleepTime := f.sleepTime
|
||||
if err == nil {
|
||||
@@ -180,24 +189,41 @@ func (f *FsDrive) paceRefresh(err error) bool {
|
||||
fs.Debug(f, "Reducing sleep to %v", f.sleepTime)
|
||||
}
|
||||
} else {
|
||||
fs.Debug(f, "Error recived: %v", err)
|
||||
if gerr, ok := err.(*googleapi.Error); ok {
|
||||
if len(gerr.Errors) > 0 {
|
||||
fs.Debug(f, "Error recived: %T %#v", err, err)
|
||||
// Check for net error Timeout()
|
||||
if x, ok := err.(interface {
|
||||
Timeout() bool
|
||||
}); ok && x.Timeout() {
|
||||
again = true
|
||||
}
|
||||
// Check for net error Temporary()
|
||||
if x, ok := err.(interface {
|
||||
Temporary() bool
|
||||
}); ok && x.Temporary() {
|
||||
again = true
|
||||
}
|
||||
switch gerr := err.(type) {
|
||||
case *googleapi.Error:
|
||||
if gerr.Code >= 500 && gerr.Code < 600 {
|
||||
// All 5xx errors should be retried
|
||||
again = true
|
||||
} else if len(gerr.Errors) > 0 {
|
||||
reason := gerr.Errors[0].Reason
|
||||
if reason == "rateLimitExceeded" || reason == "userRateLimitExceeded" {
|
||||
f.sleepTime *= 2
|
||||
if f.sleepTime > maxSleep {
|
||||
f.sleepTime = maxSleep
|
||||
}
|
||||
if f.sleepTime != oldSleepTime {
|
||||
fs.Debug(f, "Rate limited, increasing sleep to %v", f.sleepTime)
|
||||
}
|
||||
again = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if again {
|
||||
f.sleepTime *= 2
|
||||
if f.sleepTime > maxSleep {
|
||||
f.sleepTime = maxSleep
|
||||
}
|
||||
if f.sleepTime != oldSleepTime {
|
||||
fs.Debug(f, "Rate limited, increasing sleep to %v", f.sleepTime)
|
||||
}
|
||||
}
|
||||
return again
|
||||
}
|
||||
|
||||
@@ -205,11 +231,11 @@ func (f *FsDrive) paceRefresh(err error) bool {
|
||||
// on 403 rate limit exceeded
|
||||
//
|
||||
// This calls fn, expecting it to place its error in perr
|
||||
func (f *FsDrive) pace(perr *error, fn func()) {
|
||||
func (f *FsDrive) call(perr *error, fn func()) {
|
||||
for {
|
||||
f.paceWait()
|
||||
f.beginCall()
|
||||
fn()
|
||||
if !f.paceRefresh(*perr) {
|
||||
if !f.endCall(*perr) {
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -253,7 +279,7 @@ func (f *FsDrive) listAll(dirId string, title string, directoriesOnly bool, file
|
||||
OUTER:
|
||||
for {
|
||||
var files *drive.FileList
|
||||
f.pace(&err, func() {
|
||||
f.call(&err, func() {
|
||||
files, err = list.Do()
|
||||
})
|
||||
if err != nil {
|
||||
@@ -273,8 +299,27 @@ OUTER:
|
||||
return
|
||||
}
|
||||
|
||||
// Returns true of x is a power of 2 or zero
|
||||
func isPowerOfTwo(x int64) bool {
|
||||
switch {
|
||||
case x == 0:
|
||||
return true
|
||||
case x < 0:
|
||||
return false
|
||||
default:
|
||||
return (x & (x - 1)) == 0
|
||||
}
|
||||
}
|
||||
|
||||
// NewFs contstructs an FsDrive from the path, container:path
|
||||
func NewFs(name, path string) (fs.Fs, error) {
|
||||
if !isPowerOfTwo(int64(chunkSize)) {
|
||||
return nil, fmt.Errorf("drive: chunk size %v isn't a power of two", chunkSize)
|
||||
}
|
||||
if chunkSize < 256*1024 {
|
||||
return nil, fmt.Errorf("drive: chunk size can't be less than 256k - was %v", chunkSize)
|
||||
}
|
||||
|
||||
t, err := driveAuth.NewTransport(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -303,7 +348,7 @@ func NewFs(name, path string) (fs.Fs, error) {
|
||||
}
|
||||
|
||||
// Read About so we know the root path
|
||||
f.pace(&err, func() {
|
||||
f.call(&err, func() {
|
||||
f.about, err = f.svc.About.Get().Do()
|
||||
})
|
||||
if err != nil {
|
||||
@@ -572,7 +617,7 @@ func (f *FsDrive) _findDir(path string, create bool) (pathId string, err error)
|
||||
Parents: []*drive.ParentReference{{Id: pathId}},
|
||||
}
|
||||
var info *drive.File
|
||||
f.pace(&err, func() {
|
||||
f.call(&err, func() {
|
||||
info, err = f.svc.Files.Insert(createInfo).Do()
|
||||
})
|
||||
if err != nil {
|
||||
@@ -636,7 +681,7 @@ func (f *FsDrive) List() fs.ObjectsChan {
|
||||
err := f.findRoot(false)
|
||||
if err != nil {
|
||||
fs.Stats.Error()
|
||||
log.Printf("Couldn't find root: %s", err)
|
||||
fs.Log(f, "Couldn't find root: %s", err)
|
||||
} else {
|
||||
if f.root == "" && *driveFullList {
|
||||
err = f.listDirFull(f.rootId, "", out)
|
||||
@@ -645,7 +690,7 @@ func (f *FsDrive) List() fs.ObjectsChan {
|
||||
}
|
||||
if err != nil {
|
||||
fs.Stats.Error()
|
||||
log.Printf("List failed: %s", err)
|
||||
fs.Log(f, "List failed: %s", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -660,7 +705,7 @@ func (f *FsDrive) ListDir() fs.DirChan {
|
||||
err := f.findRoot(false)
|
||||
if err != nil {
|
||||
fs.Stats.Error()
|
||||
log.Printf("Couldn't find root: %s", err)
|
||||
fs.Log(f, "Couldn't find root: %s", err)
|
||||
} else {
|
||||
_, err := f.listAll(f.rootId, "", true, false, func(item *drive.File) bool {
|
||||
dir := &fs.Dir{
|
||||
@@ -674,7 +719,7 @@ func (f *FsDrive) ListDir() fs.DirChan {
|
||||
})
|
||||
if err != nil {
|
||||
fs.Stats.Error()
|
||||
log.Printf("ListDir failed: %s", err)
|
||||
fs.Log(f, "ListDir failed: %s", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -700,33 +745,33 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64
|
||||
return o, fmt.Errorf("Couldn't find or make directory: %s", err)
|
||||
}
|
||||
|
||||
// Guess the mime type
|
||||
mimeType := mime.TypeByExtension(path.Ext(o.remote))
|
||||
if mimeType == "" {
|
||||
mimeType = "application/octet-stream"
|
||||
}
|
||||
modifiedDate := modTime.Format(timeFormatOut)
|
||||
|
||||
// Define the metadata for the file we are going to create.
|
||||
createInfo := &drive.File{
|
||||
Title: leaf,
|
||||
Description: leaf,
|
||||
Parents: []*drive.ParentReference{{Id: directoryId}},
|
||||
MimeType: mimeType,
|
||||
ModifiedDate: modifiedDate,
|
||||
MimeType: fs.MimeType(o),
|
||||
ModifiedDate: modTime.Format(timeFormatOut),
|
||||
}
|
||||
|
||||
// Make the API request to upload metadata and file data.
|
||||
in = &fs.SeekWrapper{In: in, Size: size}
|
||||
var info *drive.File
|
||||
// Don't retry, return a retry error instead
|
||||
f.paceWait()
|
||||
info, err = f.svc.Files.Insert(createInfo).Media(in).Do()
|
||||
if f.paceRefresh(err) {
|
||||
return o, fs.RetryErrorf("Upload failed - retry: %s", err)
|
||||
}
|
||||
if err != nil {
|
||||
return o, fmt.Errorf("Upload failed: %s", err)
|
||||
if size == 0 || size < int64(driveUploadCutoff) {
|
||||
// Make the API request to upload metadata and file data.
|
||||
// Don't retry, return a retry error instead
|
||||
f.beginCall()
|
||||
info, err = f.svc.Files.Insert(createInfo).Media(in).Do()
|
||||
if f.endCall(err) {
|
||||
return o, fs.RetryErrorf("Upload failed - retry: %s", err)
|
||||
}
|
||||
if err != nil {
|
||||
return o, fmt.Errorf("Upload failed: %s", err)
|
||||
}
|
||||
} else {
|
||||
// Upload the file in chunks
|
||||
info, err = f.Upload(in, size, createInfo.MimeType, createInfo, remote)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
}
|
||||
o.setMetaData(info)
|
||||
return o, nil
|
||||
@@ -746,7 +791,7 @@ func (f *FsDrive) Rmdir() error {
|
||||
return err
|
||||
}
|
||||
var children *drive.ChildList
|
||||
f.pace(&err, func() {
|
||||
f.call(&err, func() {
|
||||
children, err = f.svc.Children.List(f.rootId).MaxResults(10).Do()
|
||||
})
|
||||
if err != nil {
|
||||
@@ -757,7 +802,7 @@ func (f *FsDrive) Rmdir() error {
|
||||
}
|
||||
// Delete the directory if it isn't the root
|
||||
if f.root != "" {
|
||||
f.pace(&err, func() {
|
||||
f.call(&err, func() {
|
||||
err = f.svc.Files.Delete(f.rootId).Do()
|
||||
})
|
||||
if err != nil {
|
||||
@@ -786,7 +831,7 @@ func (f *FsDrive) Purge() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f.pace(&err, func() {
|
||||
f.call(&err, func() {
|
||||
err = f.svc.Files.Delete(f.rootId).Do()
|
||||
})
|
||||
f.resetRoot()
|
||||
@@ -898,12 +943,13 @@ func (o *FsObjectDrive) SetModTime(modTime time.Time) {
|
||||
}
|
||||
// Set modified date
|
||||
var info *drive.File
|
||||
o.drive.pace(&err, func() {
|
||||
o.drive.call(&err, func() {
|
||||
info, err = o.drive.svc.Files.Update(o.id, updateInfo).SetModifiedDate(true).Do()
|
||||
})
|
||||
if err != nil {
|
||||
fs.Stats.Error()
|
||||
fs.Log(o, "Failed to update remote mtime: %s", err)
|
||||
return
|
||||
}
|
||||
// Update info from read data
|
||||
o.setMetaData(info)
|
||||
@@ -922,7 +968,7 @@ func (o *FsObjectDrive) Open() (in io.ReadCloser, err error) {
|
||||
}
|
||||
req.Header.Set("User-Agent", fs.UserAgent)
|
||||
var res *http.Response
|
||||
o.drive.pace(&err, func() {
|
||||
o.drive.call(&err, func() {
|
||||
res, err = o.drive.client.Do(req)
|
||||
})
|
||||
if err != nil {
|
||||
@@ -947,17 +993,24 @@ func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) erro
|
||||
}
|
||||
|
||||
// Make the API request to upload metadata and file data.
|
||||
in = &fs.SeekWrapper{In: in, Size: size}
|
||||
var err error
|
||||
var info *drive.File
|
||||
// Don't retry, return a retry error instead
|
||||
o.drive.paceWait()
|
||||
info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do()
|
||||
if o.drive.paceRefresh(err) {
|
||||
return fs.RetryErrorf("Update failed - retry: %s", err)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("Update failed: %s", err)
|
||||
if size == 0 || size < int64(driveUploadCutoff) {
|
||||
// Don't retry, return a retry error instead
|
||||
o.drive.beginCall()
|
||||
info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do()
|
||||
if o.drive.endCall(err) {
|
||||
return fs.RetryErrorf("Update failed - retry: %s", err)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("Update failed: %s", err)
|
||||
}
|
||||
} else {
|
||||
// Upload the file in chunks
|
||||
info, err = o.drive.Upload(in, size, fs.MimeType(o), updateInfo, o.remote)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
o.setMetaData(info)
|
||||
return nil
|
||||
@@ -966,7 +1019,7 @@ func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) erro
|
||||
// Remove an object
|
||||
func (o *FsObjectDrive) Remove() error {
|
||||
var err error
|
||||
o.drive.pace(&err, func() {
|
||||
o.drive.call(&err, func() {
|
||||
err = o.drive.svc.Files.Delete(o.id).Do()
|
||||
})
|
||||
return err
|
||||
|
||||
246
drive/upload.go
Normal file
246
drive/upload.go
Normal file
@@ -0,0 +1,246 @@
|
||||
// Upload for drive
|
||||
//
|
||||
// Docs
|
||||
// Resumable upload: https://developers.google.com/drive/web/manage-uploads#resumable
|
||||
// Best practices: https://developers.google.com/drive/web/manage-uploads#best-practices
|
||||
// Files insert: https://developers.google.com/drive/v2/reference/files/insert
|
||||
// Files update: https://developers.google.com/drive/v2/reference/files/update
|
||||
//
|
||||
// This contains code adapted from google.golang.org/api (C) the GO AUTHORS
|
||||
|
||||
package drive
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
||||
"github.com/ncw/rclone/fs"
|
||||
"google.golang.org/api/drive/v2"
|
||||
"google.golang.org/api/googleapi"
|
||||
)
|
||||
|
||||
const (
|
||||
// statusResumeIncomplete is the code returned by the Google uploader when the transfer is not yet complete.
|
||||
statusResumeIncomplete = 308
|
||||
|
||||
// Number of times to try each chunk
|
||||
maxTries = 10
|
||||
)
|
||||
|
||||
// resumableUpload is used by the generated APIs to provide resumable uploads.
|
||||
// It is not used by developers directly.
|
||||
type resumableUpload struct {
|
||||
f *FsDrive
|
||||
remote string
|
||||
// URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable".
|
||||
URI string
|
||||
// Media is the object being uploaded.
|
||||
Media io.Reader
|
||||
// MediaType defines the media type, e.g. "image/jpeg".
|
||||
MediaType string
|
||||
// ContentLength is the full size of the object being uploaded.
|
||||
ContentLength int64
|
||||
// Return value
|
||||
ret *drive.File
|
||||
}
|
||||
|
||||
// Upload the io.Reader in of size bytes with contentType and info
|
||||
func (f *FsDrive) Upload(in io.Reader, size int64, contentType string, info *drive.File, remote string) (*drive.File, error) {
|
||||
fileId := info.Id
|
||||
var body io.Reader = nil
|
||||
body, err := googleapi.WithoutDataWrapper.JSONReader(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
params := make(url.Values)
|
||||
params.Set("alt", "json")
|
||||
params.Set("uploadType", "resumable")
|
||||
urls := "https://www.googleapis.com/upload/drive/v2/files"
|
||||
method := "POST"
|
||||
if fileId != "" {
|
||||
params.Set("setModifiedDate", "true")
|
||||
urls += "/{fileId}"
|
||||
method = "PUT"
|
||||
}
|
||||
urls += "?" + params.Encode()
|
||||
req, _ := http.NewRequest(method, urls, body)
|
||||
googleapi.Expand(req.URL, map[string]string{
|
||||
"fileId": fileId,
|
||||
})
|
||||
req.Header.Set("Content-Type", "application/json; charset=UTF-8")
|
||||
req.Header.Set("X-Upload-Content-Type", contentType)
|
||||
req.Header.Set("X-Upload-Content-Length", fmt.Sprintf("%v", size))
|
||||
req.Header.Set("User-Agent", fs.UserAgent)
|
||||
var res *http.Response
|
||||
f.call(&err, func() {
|
||||
res, err = f.client.Do(req)
|
||||
if err == nil {
|
||||
defer googleapi.CloseBody(res)
|
||||
err = googleapi.CheckResponse(res)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
loc := res.Header.Get("Location")
|
||||
rx := &resumableUpload{
|
||||
f: f,
|
||||
remote: remote,
|
||||
URI: loc,
|
||||
Media: in,
|
||||
MediaType: contentType,
|
||||
ContentLength: size,
|
||||
}
|
||||
return rx.Upload()
|
||||
}
|
||||
|
||||
// Make an http.Request for the range passed in
|
||||
func (rx *resumableUpload) makeRequest(start int64, body []byte) *http.Request {
|
||||
reqSize := int64(len(body))
|
||||
req, _ := http.NewRequest("POST", rx.URI, bytes.NewBuffer(body))
|
||||
req.ContentLength = reqSize
|
||||
if reqSize != 0 {
|
||||
req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", start, start+reqSize-1, rx.ContentLength))
|
||||
} else {
|
||||
req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", rx.ContentLength))
|
||||
}
|
||||
req.Header.Set("Content-Type", rx.MediaType)
|
||||
req.Header.Set("User-Agent", fs.UserAgent)
|
||||
return req
|
||||
}
|
||||
|
||||
// rangeRE matches the transfer status response from the server. $1 is
|
||||
// the last byte index uploaded.
|
||||
var rangeRE = regexp.MustCompile(`^0\-(\d+)$`)
|
||||
|
||||
// Query drive for the amount transferred so far
|
||||
//
|
||||
// If error is nil, then start should be valid
|
||||
func (rx *resumableUpload) transferStatus() (start int64, err error) {
|
||||
req := rx.makeRequest(0, nil)
|
||||
res, err := rx.f.client.Do(req)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer googleapi.CloseBody(res)
|
||||
if res.StatusCode == http.StatusCreated || res.StatusCode == http.StatusOK {
|
||||
return rx.ContentLength, nil
|
||||
}
|
||||
if res.StatusCode != statusResumeIncomplete {
|
||||
err = googleapi.CheckResponse(res)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return 0, fmt.Errorf("unexpected http return code %v", res.StatusCode)
|
||||
}
|
||||
Range := res.Header.Get("Range")
|
||||
if m := rangeRE.FindStringSubmatch(Range); len(m) == 2 {
|
||||
start, err = strconv.ParseInt(m[1], 10, 64)
|
||||
if err == nil {
|
||||
return start, nil
|
||||
}
|
||||
}
|
||||
return 0, fmt.Errorf("unable to parse range %q", Range)
|
||||
}
|
||||
|
||||
// Transfer a chunk - caller must call googleapi.CloseBody(res) if err == nil || res != nil
|
||||
func (rx *resumableUpload) transferChunk(start int64, body []byte) (int, error) {
|
||||
req := rx.makeRequest(start, body)
|
||||
res, err := rx.f.client.Do(req)
|
||||
if err != nil {
|
||||
return 599, err
|
||||
}
|
||||
defer googleapi.CloseBody(res)
|
||||
if res.StatusCode == statusResumeIncomplete {
|
||||
return res.StatusCode, nil
|
||||
}
|
||||
err = googleapi.CheckResponse(res)
|
||||
if err != nil {
|
||||
return res.StatusCode, err
|
||||
}
|
||||
|
||||
// When the entire file upload is complete, the server
|
||||
// responds with an HTTP 201 Created along with any metadata
|
||||
// associated with this resource. If this request had been
|
||||
// updating an existing entity rather than creating a new one,
|
||||
// the HTTP response code for a completed upload would have
|
||||
// been 200 OK.
|
||||
//
|
||||
// So parse the response out of the body. We aren't expecting
|
||||
// any other 2xx codes, so we parse it unconditionaly on
|
||||
// StatusCode
|
||||
if err = json.NewDecoder(res.Body).Decode(&rx.ret); err != nil {
|
||||
return 598, err
|
||||
}
|
||||
|
||||
return res.StatusCode, nil
|
||||
}
|
||||
|
||||
// Upload uploads the chunks from the input
|
||||
// It retries each chunk maxTries times (with a pause of uploadPause between attempts).
|
||||
func (rx *resumableUpload) Upload() (*drive.File, error) {
|
||||
start := int64(0)
|
||||
buf := make([]byte, chunkSize)
|
||||
var StatusCode int
|
||||
for start < rx.ContentLength {
|
||||
reqSize := rx.ContentLength - start
|
||||
if reqSize >= int64(chunkSize) {
|
||||
reqSize = int64(chunkSize)
|
||||
} else {
|
||||
buf = buf[:reqSize]
|
||||
}
|
||||
|
||||
// Read the chunk
|
||||
_, err := io.ReadFull(rx.Media, buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Transfer the chunk
|
||||
for try := 1; try <= maxTries; try++ {
|
||||
fs.Debug(rx.remote, "Sending chunk %d length %d, %d/%d", start, reqSize, try, maxTries)
|
||||
rx.f.beginCall()
|
||||
StatusCode, err = rx.transferChunk(start, buf)
|
||||
rx.f.endCall(err)
|
||||
if StatusCode == statusResumeIncomplete || StatusCode == http.StatusCreated || StatusCode == http.StatusOK {
|
||||
goto success
|
||||
}
|
||||
fs.Debug(rx.remote, "Retrying chunk %d/%d, code=%d, err=%v", try, maxTries, StatusCode, err)
|
||||
}
|
||||
fs.Debug(rx.remote, "Failed to send chunk")
|
||||
return nil, fs.RetryErrorf("Chunk upload failed - retry: code=%d, err=%v", StatusCode, err)
|
||||
success:
|
||||
|
||||
start += reqSize
|
||||
}
|
||||
// Resume or retry uploads that fail due to connection interruptions or
|
||||
// any 5xx errors, including:
|
||||
//
|
||||
// 500 Internal Server Error
|
||||
// 502 Bad Gateway
|
||||
// 503 Service Unavailable
|
||||
// 504 Gateway Timeout
|
||||
//
|
||||
// Use an exponential backoff strategy if any 5xx server error is
|
||||
// returned when resuming or retrying upload requests. These errors can
|
||||
// occur if a server is getting overloaded. Exponential backoff can help
|
||||
// alleviate these kinds of problems during periods of high volume of
|
||||
// requests or heavy network traffic. Other kinds of requests should not
|
||||
// be handled by exponential backoff but you can still retry a number of
|
||||
// them. When retrying these requests, limit the number of times you
|
||||
// retry them. For example your code could limit to ten retries or less
|
||||
// before reporting an error.
|
||||
//
|
||||
// Handle 404 Not Found errors when doing resumable uploads by starting
|
||||
// the entire upload over from the beginning.
|
||||
if rx.ret == nil {
|
||||
return nil, fs.RetryErrorf("Incomplete upload - retry, last error %d", StatusCode)
|
||||
}
|
||||
return rx.ret, nil
|
||||
}
|
||||
@@ -17,6 +17,20 @@ This is a JSON decode error - from Update / UploadByChunk
|
||||
- Caused by 500 error from dropbox
|
||||
- See https://github.com/stacktic/dropbox/issues/1
|
||||
- Possibly confusing dropbox with excess concurrency?
|
||||
|
||||
FIXME implement timeouts - need to get "github.com/stacktic/dropbox"
|
||||
and hence "golang.org/x/oauth2" which uses DefaultTransport unless it
|
||||
is set in the context passed into .Client()
|
||||
|
||||
func (db *Dropbox) client() *http.Client {
|
||||
return db.config.Client(oauth2.NoContext, db.token)
|
||||
}
|
||||
|
||||
// HTTPClient is the context key to use with golang.org/x/net/context's
|
||||
// WithValue function to associate an *http.Client value with a context.
|
||||
var HTTPClient ContextKey
|
||||
|
||||
So pass in a context with HTTPClient set...
|
||||
*/
|
||||
|
||||
import (
|
||||
|
||||
@@ -10,13 +10,24 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tsenart/tb"
|
||||
)
|
||||
|
||||
// Globals
|
||||
var (
|
||||
Stats = NewStats()
|
||||
Stats = NewStats()
|
||||
tokenBucket *tb.Bucket
|
||||
)
|
||||
|
||||
// Start the token bucket if necessary
|
||||
func startTokenBucket() {
|
||||
if bwLimit > 0 {
|
||||
tokenBucket = tb.NewBucket(int64(bwLimit), 100*time.Millisecond)
|
||||
Log(nil, "Starting bandwidth limiter at %vBytes/s", &bwLimit)
|
||||
}
|
||||
}
|
||||
|
||||
// Stringset holds some strings
|
||||
type StringSet map[string]bool
|
||||
|
||||
@@ -178,6 +189,10 @@ func (file *Account) Read(p []byte) (n int, err error) {
|
||||
if err == io.EOF {
|
||||
// FIXME Do something?
|
||||
}
|
||||
// Limit the transfer speed if required
|
||||
if tokenBucket != nil {
|
||||
tokenBucket.Wait(int64(n))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
142
fs/config.go
142
fs/config.go
@@ -6,6 +6,8 @@ import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/user"
|
||||
"path"
|
||||
@@ -15,6 +17,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Unknwon/goconfig"
|
||||
"github.com/mreiferson/go-httpclient"
|
||||
"github.com/ogier/pflag"
|
||||
)
|
||||
|
||||
@@ -22,6 +25,8 @@ const (
|
||||
configFileName = ".rclone.conf"
|
||||
)
|
||||
|
||||
type SizeSuffix int64
|
||||
|
||||
// Global
|
||||
var (
|
||||
// Config file
|
||||
@@ -33,23 +38,126 @@ var (
|
||||
// Global config
|
||||
Config = &ConfigInfo{}
|
||||
// Flags
|
||||
verbose = pflag.BoolP("verbose", "v", false, "Print lots more stuff")
|
||||
quiet = pflag.BoolP("quiet", "q", false, "Print as little stuff as possible")
|
||||
modifyWindow = pflag.DurationP("modify-window", "", time.Nanosecond, "Max time diff to be considered the same")
|
||||
checkers = pflag.IntP("checkers", "", 8, "Number of checkers to run in parallel.")
|
||||
transfers = pflag.IntP("transfers", "", 4, "Number of file transfers to run in parallel.")
|
||||
configFile = pflag.StringP("config", "", ConfigPath, "Config file.")
|
||||
dryRun = pflag.BoolP("dry-run", "n", false, "Do a trial run with no permanent changes")
|
||||
verbose = pflag.BoolP("verbose", "v", false, "Print lots more stuff")
|
||||
quiet = pflag.BoolP("quiet", "q", false, "Print as little stuff as possible")
|
||||
modifyWindow = pflag.DurationP("modify-window", "", time.Nanosecond, "Max time diff to be considered the same")
|
||||
checkers = pflag.IntP("checkers", "", 8, "Number of checkers to run in parallel.")
|
||||
transfers = pflag.IntP("transfers", "", 4, "Number of file transfers to run in parallel.")
|
||||
configFile = pflag.StringP("config", "", ConfigPath, "Config file.")
|
||||
dryRun = pflag.BoolP("dry-run", "n", false, "Do a trial run with no permanent changes")
|
||||
connectTimeout = pflag.DurationP("contimeout", "", 60*time.Second, "Connect timeout")
|
||||
timeout = pflag.DurationP("timeout", "", 5*60*time.Second, "IO idle timeout")
|
||||
bwLimit SizeSuffix
|
||||
)
|
||||
|
||||
func init() {
|
||||
pflag.VarP(&bwLimit, "bwlimit", "", "Bandwidth limit in kBytes/s, or use suffix k|M|G")
|
||||
}
|
||||
|
||||
// Turn SizeSuffix into a string
|
||||
func (x SizeSuffix) String() string {
|
||||
scaled := float64(0)
|
||||
suffix := ""
|
||||
switch {
|
||||
case x == 0:
|
||||
return "0"
|
||||
case x < 1024*1024:
|
||||
scaled = float64(x) / 1024
|
||||
suffix = "k"
|
||||
case x < 1024*1024*1024:
|
||||
scaled = float64(x) / 1024 / 1024
|
||||
suffix = "M"
|
||||
default:
|
||||
scaled = float64(x) / 1024 / 1024 / 1024
|
||||
suffix = "G"
|
||||
}
|
||||
if math.Floor(scaled) == scaled {
|
||||
return fmt.Sprintf("%.0f%s", scaled, suffix)
|
||||
}
|
||||
return fmt.Sprintf("%.3f%s", scaled, suffix)
|
||||
}
|
||||
|
||||
// Set a SizeSuffix
|
||||
func (x *SizeSuffix) Set(s string) error {
|
||||
if len(s) == 0 {
|
||||
return fmt.Errorf("Empty string")
|
||||
}
|
||||
suffix := s[len(s)-1]
|
||||
suffixLen := 1
|
||||
var multiplier float64
|
||||
switch suffix {
|
||||
case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '.':
|
||||
suffixLen = 0
|
||||
multiplier = 1 << 10
|
||||
case 'k', 'K':
|
||||
multiplier = 1 << 10
|
||||
case 'm', 'M':
|
||||
multiplier = 1 << 20
|
||||
case 'g', 'G':
|
||||
multiplier = 1 << 30
|
||||
default:
|
||||
return fmt.Errorf("Bad suffix %q", suffix)
|
||||
}
|
||||
s = s[:len(s)-suffixLen]
|
||||
value, err := strconv.ParseFloat(s, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if value < 0 {
|
||||
return fmt.Errorf("Size can't be negative %q", s)
|
||||
}
|
||||
value *= multiplier
|
||||
*x = SizeSuffix(value)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check it satisfies the interface
|
||||
var _ pflag.Value = (*SizeSuffix)(nil)
|
||||
|
||||
// Filesystem config options
|
||||
type ConfigInfo struct {
|
||||
Verbose bool
|
||||
Quiet bool
|
||||
DryRun bool
|
||||
ModifyWindow time.Duration
|
||||
Checkers int
|
||||
Transfers int
|
||||
Verbose bool
|
||||
Quiet bool
|
||||
DryRun bool
|
||||
ModifyWindow time.Duration
|
||||
Checkers int
|
||||
Transfers int
|
||||
ConnectTimeout time.Duration // Connect timeout
|
||||
Timeout time.Duration // Data channel timeout
|
||||
}
|
||||
|
||||
// Transport returns an http.RoundTripper with the correct timeouts
|
||||
func (ci *ConfigInfo) Transport() http.RoundTripper {
|
||||
return &httpclient.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
MaxIdleConnsPerHost: ci.Checkers + ci.Transfers + 1,
|
||||
|
||||
// ConnectTimeout, if non-zero, is the maximum amount of time a dial will wait for
|
||||
// a connect to complete.
|
||||
ConnectTimeout: ci.ConnectTimeout,
|
||||
|
||||
// ResponseHeaderTimeout, if non-zero, specifies the amount of
|
||||
// time to wait for a server's response headers after fully
|
||||
// writing the request (including its body, if any). This
|
||||
// time does not include the time to read the response body.
|
||||
ResponseHeaderTimeout: ci.Timeout,
|
||||
|
||||
// RequestTimeout, if non-zero, specifies the amount of time for the entire
|
||||
// request to complete (including all of the above timeouts + entire response body).
|
||||
// This should never be less than the sum total of the above two timeouts.
|
||||
//RequestTimeout: NOT SET,
|
||||
|
||||
// ReadWriteTimeout, if non-zero, will set a deadline for every Read and
|
||||
// Write operation on the request connection.
|
||||
ReadWriteTimeout: ci.Timeout,
|
||||
}
|
||||
}
|
||||
|
||||
// Transport returns an http.Client with the correct timeouts
|
||||
func (ci *ConfigInfo) Client() *http.Client {
|
||||
return &http.Client{
|
||||
Transport: ci.Transport(),
|
||||
}
|
||||
}
|
||||
|
||||
// Find the config directory
|
||||
@@ -84,6 +192,8 @@ func LoadConfig() {
|
||||
Config.Checkers = *checkers
|
||||
Config.Transfers = *transfers
|
||||
Config.DryRun = *dryRun
|
||||
Config.Timeout = *timeout
|
||||
Config.ConnectTimeout = *connectTimeout
|
||||
|
||||
ConfigPath = *configFile
|
||||
|
||||
@@ -97,6 +207,9 @@ func LoadConfig() {
|
||||
log.Fatalf("Failed to read null config file: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start the token bucket limiter
|
||||
startTokenBucket()
|
||||
}
|
||||
|
||||
// Save configuration file.
|
||||
@@ -330,6 +443,7 @@ func EditConfig() {
|
||||
name := ChooseRemote()
|
||||
EditRemote(name)
|
||||
case 'n':
|
||||
nameLoop:
|
||||
for {
|
||||
fmt.Printf("name> ")
|
||||
name := ReadLine()
|
||||
@@ -340,7 +454,7 @@ func EditConfig() {
|
||||
fmt.Printf("Can't use %q as it can be confused a drive letter\n", name)
|
||||
default:
|
||||
NewRemote(name)
|
||||
break
|
||||
break nameLoop
|
||||
}
|
||||
}
|
||||
case 'd':
|
||||
|
||||
57
fs/config_test.go
Normal file
57
fs/config_test.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package fs
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestSizeSuffixString(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
in float64
|
||||
want string
|
||||
}{
|
||||
{0, "0"},
|
||||
{102, "0.100k"},
|
||||
{1024, "1k"},
|
||||
{1024 * 1024, "1M"},
|
||||
{1024 * 1024 * 1024, "1G"},
|
||||
{10 * 1024 * 1024 * 1024, "10G"},
|
||||
{10.1 * 1024 * 1024 * 1024, "10.100G"},
|
||||
} {
|
||||
ss := SizeSuffix(test.in)
|
||||
got := ss.String()
|
||||
if test.want != got {
|
||||
t.Errorf("Want %v got %v", test.want, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSizeSuffixSet(t *testing.T) {
|
||||
for i, test := range []struct {
|
||||
in string
|
||||
want int64
|
||||
err bool
|
||||
}{
|
||||
{"0", 0, false},
|
||||
{"0.1k", 102, false},
|
||||
{"0.1", 102, false},
|
||||
{"1K", 1024, false},
|
||||
{"1", 1024, false},
|
||||
{"2.5", 1024 * 2.5, false},
|
||||
{"1M", 1024 * 1024, false},
|
||||
{"1.g", 1024 * 1024 * 1024, false},
|
||||
{"10G", 10 * 1024 * 1024 * 1024, false},
|
||||
{"", 0, true},
|
||||
{"1p", 0, true},
|
||||
{"1.p", 0, true},
|
||||
{"1p", 0, true},
|
||||
{"-1K", 0, true},
|
||||
} {
|
||||
ss := SizeSuffix(0)
|
||||
err := ss.Set(test.in)
|
||||
if (err != nil) != test.err {
|
||||
t.Errorf("%d: Expecting error %v but got error %v", i, test.err, err)
|
||||
}
|
||||
got := int64(ss)
|
||||
if test.want != got {
|
||||
t.Errorf("%d: Want %v got %v", i, test.want, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
7
fs/fs.go
7
fs/fs.go
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"time"
|
||||
)
|
||||
@@ -235,14 +236,16 @@ func NewFs(path string) (Fs, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// change native directory separators to / if there are any
|
||||
fsPath = filepath.ToSlash(fsPath)
|
||||
return fs.NewFs(configName, fsPath)
|
||||
}
|
||||
|
||||
// Outputs log for object
|
||||
func OutputLog(o interface{}, text string, args ...interface{}) {
|
||||
description := ""
|
||||
if x, ok := o.(fmt.Stringer); ok {
|
||||
description = x.String() + ": "
|
||||
if o != nil {
|
||||
description = fmt.Sprintf("%v: ", o)
|
||||
}
|
||||
out := fmt.Sprintf(text, args...)
|
||||
log.Print(description + out)
|
||||
|
||||
@@ -5,6 +5,8 @@ package fs
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"mime"
|
||||
"path"
|
||||
"sync"
|
||||
)
|
||||
|
||||
@@ -97,15 +99,29 @@ func Equal(src, dst Object) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Used to remove a failed copy
|
||||
func removeFailedCopy(dst Object) {
|
||||
if dst != nil {
|
||||
Debug(dst, "Removing failed copy")
|
||||
removeErr := dst.Remove()
|
||||
if removeErr != nil {
|
||||
Debug(dst, "Failed to remove failed copy: %s", removeErr)
|
||||
}
|
||||
// Returns a guess at the mime type from the extension
|
||||
func MimeType(o Object) string {
|
||||
mimeType := mime.TypeByExtension(path.Ext(o.Remote()))
|
||||
if mimeType == "" {
|
||||
mimeType = "application/octet-stream"
|
||||
}
|
||||
return mimeType
|
||||
}
|
||||
|
||||
// Used to remove a failed copy
|
||||
//
|
||||
// Returns whether the file was succesfully removed or not
|
||||
func removeFailedCopy(dst Object) bool {
|
||||
if dst == nil {
|
||||
return false
|
||||
}
|
||||
Debug(dst, "Removing failed copy")
|
||||
removeErr := dst.Remove()
|
||||
if removeErr != nil {
|
||||
Debug(dst, "Failed to remove failed copy: %s", removeErr)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Copy src object to dst or f if nil
|
||||
@@ -139,7 +155,11 @@ tryAgain:
|
||||
if r, ok := err.(Retry); ok && r.Retry() && tries < maxTries {
|
||||
tries++
|
||||
Log(src, "Received error: %v - retrying %d/%d", err, tries, maxTries)
|
||||
removeFailedCopy(dst)
|
||||
if removeFailedCopy(dst) {
|
||||
// If we removed dst, then nil it out and note we are not updating
|
||||
dst = nil
|
||||
doUpdate = false
|
||||
}
|
||||
goto tryAgain
|
||||
}
|
||||
if err == nil {
|
||||
@@ -259,6 +279,20 @@ func DeleteFiles(to_be_deleted ObjectsChan) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Read a map of Object.Remote to Object for the given Fs
|
||||
func readFilesMap(fs Fs) map[string]Object {
|
||||
files := make(map[string]Object)
|
||||
for o := range fs.List() {
|
||||
remote := o.Remote()
|
||||
if _, ok := files[remote]; !ok {
|
||||
files[remote] = o
|
||||
} else {
|
||||
Log(o, "Duplicate file detected")
|
||||
}
|
||||
}
|
||||
return files
|
||||
}
|
||||
|
||||
// Syncs fsrc into fdst
|
||||
//
|
||||
// If Delete is true then it deletes any files in fdst that aren't in fsrc
|
||||
@@ -273,10 +307,7 @@ func Sync(fdst, fsrc Fs, Delete bool) error {
|
||||
|
||||
// Read the destination files first
|
||||
// FIXME could do this in parallel and make it use less memory
|
||||
delFiles := make(map[string]Object)
|
||||
for dst := range fdst.List() {
|
||||
delFiles[dst.Remote()] = dst
|
||||
}
|
||||
delFiles := readFilesMap(fdst)
|
||||
|
||||
// Read source files checking them off against dest files
|
||||
to_be_checked := make(ObjectPairChan, Config.Transfers)
|
||||
@@ -341,22 +372,20 @@ func Check(fdst, fsrc Fs) error {
|
||||
|
||||
// Read the destination files first
|
||||
// FIXME could do this in parallel and make it use less memory
|
||||
dstFiles := make(map[string]Object)
|
||||
for dst := range fdst.List() {
|
||||
dstFiles[dst.Remote()] = dst
|
||||
}
|
||||
dstFiles := readFilesMap(fdst)
|
||||
|
||||
// Read the source files checking them against dstFiles
|
||||
// FIXME could do this in parallel and make it use less memory
|
||||
srcFiles := make(map[string]Object)
|
||||
srcFiles := readFilesMap(fsrc)
|
||||
|
||||
// Move all the common files into commonFiles and delete then
|
||||
// from srcFiles and dstFiles
|
||||
commonFiles := make(map[string][]Object)
|
||||
for src := range fsrc.List() {
|
||||
remote := src.Remote()
|
||||
for remote, src := range srcFiles {
|
||||
if dst, ok := dstFiles[remote]; ok {
|
||||
commonFiles[remote] = []Object{dst, src}
|
||||
delete(srcFiles, remote)
|
||||
delete(dstFiles, remote)
|
||||
} else {
|
||||
srcFiles[remote] = src
|
||||
}
|
||||
}
|
||||
|
||||
@@ -436,6 +465,16 @@ func ListFn(f Fs, fn func(Object)) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// mutex for synchronized output
|
||||
var outMutex sync.Mutex
|
||||
|
||||
// Synchronized fmt.Fprintf
|
||||
func syncFprintf(w io.Writer, format string, a ...interface{}) (n int, err error) {
|
||||
outMutex.Lock()
|
||||
defer outMutex.Unlock()
|
||||
return fmt.Fprintf(w, format, a...)
|
||||
}
|
||||
|
||||
// List the Fs to stdout
|
||||
//
|
||||
// Shows size and path
|
||||
@@ -443,7 +482,7 @@ func ListFn(f Fs, fn func(Object)) error {
|
||||
// Lists in parallel which may get them out of order
|
||||
func List(f Fs, w io.Writer) error {
|
||||
return ListFn(f, func(o Object) {
|
||||
fmt.Fprintf(w, "%9d %s\n", o.Size(), o.Remote())
|
||||
syncFprintf(w, "%9d %s\n", o.Size(), o.Remote())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -457,7 +496,7 @@ func ListLong(f Fs, w io.Writer) error {
|
||||
Stats.Checking(o)
|
||||
modTime := o.ModTime()
|
||||
Stats.DoneChecking(o)
|
||||
fmt.Fprintf(w, "%9d %s %s\n", o.Size(), modTime.Format("2006-01-02 15:04:05.000000000"), o.Remote())
|
||||
syncFprintf(w, "%9d %s %s\n", o.Size(), modTime.Format("2006-01-02 15:04:05.000000000"), o.Remote())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -475,14 +514,14 @@ func Md5sum(f Fs, w io.Writer) error {
|
||||
Debug(o, "Failed to read MD5: %v", err)
|
||||
md5sum = "UNKNOWN"
|
||||
}
|
||||
fmt.Fprintf(w, "%32s %s\n", md5sum, o.Remote())
|
||||
syncFprintf(w, "%32s %s\n", md5sum, o.Remote())
|
||||
})
|
||||
}
|
||||
|
||||
// List the directories/buckets/containers in the Fs to stdout
|
||||
func ListDir(f Fs, w io.Writer) error {
|
||||
for dir := range f.ListDir() {
|
||||
fmt.Fprintf(w, "%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)
|
||||
syncFprintf(w, "%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ var (
|
||||
flocal, fremote fs.Fs
|
||||
RemoteName = flag.String("remote", "", "Remote to test with, defaults to local filesystem")
|
||||
SubDir = flag.Bool("subdir", false, "Set to test with a sub directory")
|
||||
Verbose = flag.Bool("verbose", false, "Set to enable logging")
|
||||
finalise func()
|
||||
)
|
||||
|
||||
@@ -62,8 +63,8 @@ var t3 = fstest.Time("2011-12-30T12:59:59.000000000Z")
|
||||
|
||||
func TestInit(t *testing.T) {
|
||||
fs.LoadConfig()
|
||||
fs.Config.Verbose = false
|
||||
fs.Config.Quiet = true
|
||||
fs.Config.Verbose = *Verbose
|
||||
fs.Config.Quiet = !*Verbose
|
||||
var err error
|
||||
fremote, finalise, err = fstest.RandomRemote(*RemoteName, *SubDir)
|
||||
if err != nil {
|
||||
|
||||
@@ -1,39 +0,0 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
// SeekWrapper wraps an io.Reader with a basic Seek method which
|
||||
// returns the Size attribute.
|
||||
//
|
||||
// This is used for google.golang.org/api/googleapi/googleapi.go
|
||||
// to detect the length (see getReaderSize function)
|
||||
//
|
||||
// Without this the getReaderSize function reads the entire file into
|
||||
// memory to find its length.
|
||||
type SeekWrapper struct {
|
||||
In io.Reader
|
||||
Size int64
|
||||
}
|
||||
|
||||
// Read bytes from the object - see io.Reader
|
||||
func (file *SeekWrapper) Read(p []byte) (n int, err error) {
|
||||
return file.In.Read(p)
|
||||
}
|
||||
|
||||
// Seek - minimal implementation for Google API length detection
|
||||
func (file *SeekWrapper) Seek(offset int64, whence int) (int64, error) {
|
||||
switch whence {
|
||||
case os.SEEK_CUR:
|
||||
return 0, nil
|
||||
case os.SEEK_END:
|
||||
return file.Size, nil
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Interfaces that SeekWrapper implements
|
||||
var _ io.Reader = (*SeekWrapper)(nil)
|
||||
var _ io.Seeker = (*SeekWrapper)(nil)
|
||||
@@ -1,3 +1,3 @@
|
||||
package fs
|
||||
|
||||
const Version = "v1.09"
|
||||
const Version = "v1.13"
|
||||
|
||||
@@ -109,7 +109,7 @@ func TestFsListEmpty(t *testing.T) {
|
||||
func TestFsListDirEmpty(t *testing.T) {
|
||||
skipIfNotOk(t)
|
||||
for obj := range remote.ListDir() {
|
||||
t.Error("Found unexpected item %q", obj.Name)
|
||||
t.Errorf("Found unexpected item %q", obj.Name)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,7 +174,7 @@ func TestFsListDirRoot(t *testing.T) {
|
||||
skipIfNotOk(t)
|
||||
rootRemote, err := fs.NewFs(RemoteName)
|
||||
if err != nil {
|
||||
t.Fatal("Failed to make remote %q: %v", RemoteName, err)
|
||||
t.Fatalf("Failed to make remote %q: %v", RemoteName, err)
|
||||
}
|
||||
found := false
|
||||
for obj := range rootRemote.ListDir() {
|
||||
@@ -191,7 +191,7 @@ func TestFsListRoot(t *testing.T) {
|
||||
skipIfNotOk(t)
|
||||
rootRemote, err := fs.NewFs(RemoteName)
|
||||
if err != nil {
|
||||
t.Fatal("Failed to make remote %q: %v", RemoteName, err)
|
||||
t.Fatalf("Failed to make remote %q: %v", RemoteName, err)
|
||||
}
|
||||
// Should either find file1 and file2 or nothing
|
||||
found1 := false
|
||||
@@ -384,7 +384,7 @@ func TestLimitedFs(t *testing.T) {
|
||||
file2Copy.Path = "z.txt"
|
||||
fileRemote, err := fs.NewFs(remoteName)
|
||||
if err != nil {
|
||||
t.Fatal("Failed to make remote %q: %v", remoteName, err)
|
||||
t.Fatalf("Failed to make remote %q: %v", remoteName, err)
|
||||
}
|
||||
fstest.CheckListing(t, fileRemote, []fstest.Item{file2Copy})
|
||||
_, ok := fileRemote.(*fs.Limited)
|
||||
@@ -398,7 +398,7 @@ func TestLimitedFsNotFound(t *testing.T) {
|
||||
remoteName := subRemoteName + "/not found.txt"
|
||||
fileRemote, err := fs.NewFs(remoteName)
|
||||
if err != nil {
|
||||
t.Fatal("Failed to make remote %q: %v", remoteName, err)
|
||||
t.Fatalf("Failed to make remote %q: %v", remoteName, err)
|
||||
}
|
||||
fstest.CheckListing(t, fileRemote, []fstest.Item{})
|
||||
_, ok := fileRemote.(*fs.Limited)
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"code.google.com/p/goauth2/oauth"
|
||||
"github.com/ncw/rclone/fs"
|
||||
@@ -82,7 +81,7 @@ func (auth *Auth) newTransport(name string) (*oauth.Transport, error) {
|
||||
|
||||
t := &oauth.Transport{
|
||||
Config: config,
|
||||
Transport: http.DefaultTransport,
|
||||
Transport: fs.Config.Transport(),
|
||||
}
|
||||
|
||||
return t, nil
|
||||
|
||||
@@ -17,7 +17,6 @@ import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime"
|
||||
"net/http"
|
||||
"path"
|
||||
"regexp"
|
||||
@@ -360,7 +359,6 @@ func (f *FsStorage) ListDir() fs.DirChan {
|
||||
func (f *FsStorage) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.Object, error) {
|
||||
// Temporary FsObject under construction
|
||||
o := &FsObjectStorage{storage: f, remote: remote}
|
||||
in = &fs.SeekWrapper{In: in, Size: size}
|
||||
return o, o.Update(in, modTime, size)
|
||||
}
|
||||
|
||||
@@ -549,21 +547,14 @@ func (o *FsObjectStorage) Open() (in io.ReadCloser, err error) {
|
||||
//
|
||||
// The new object may have been created if an error is returned
|
||||
func (o *FsObjectStorage) Update(in io.Reader, modTime time.Time, size int64) error {
|
||||
// Guess the content type
|
||||
contentType := mime.TypeByExtension(path.Ext(o.remote))
|
||||
if contentType == "" {
|
||||
contentType = "application/octet-stream"
|
||||
}
|
||||
|
||||
object := storage.Object{
|
||||
Bucket: o.storage.bucket,
|
||||
Name: o.storage.root + o.remote,
|
||||
ContentType: contentType,
|
||||
ContentType: fs.MimeType(o),
|
||||
Size: uint64(size),
|
||||
Updated: modTime.Format(timeFormatOut), // Doesn't get set
|
||||
Metadata: metadataFromModTime(modTime),
|
||||
}
|
||||
in = &fs.SeekWrapper{In: in, Size: size}
|
||||
newObject, err := o.storage.svc.Objects.Insert(o.storage.bucket, &object).Media(in).Name(object.Name).PredefinedAcl(o.storage.objectAcl).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
22
notes.txt
22
notes.txt
@@ -19,30 +19,23 @@ Todo
|
||||
* if object.PseudoDirectory {
|
||||
* fmt.Printf("%9s %19s %s\n", "Directory", "-", fs.Remote())
|
||||
* Make Account wrapper
|
||||
* limit bandwidth for a pool of all individual connectinos
|
||||
* do timeouts by setting a limit, seeing whether io has happened
|
||||
and resetting it if it has
|
||||
* make Account do progress meter
|
||||
* Make logging controllable with flags (mostly done)
|
||||
* -timeout: Make all timeouts be settable with command line parameters
|
||||
* Windows paths? Do we need to translate / and \?
|
||||
* Make a fs.Errorf and count errors and log them at a different level
|
||||
* Add max object size to fs metadata - 5GB for swift, infinite for local, ? for s3
|
||||
* tie into -max-size flag
|
||||
* FIXME Make NewFs to return err.IsAnObject so can put the LimitedFs
|
||||
creation in common code? Or try for as much as possible?
|
||||
* FIXME Account all the transactons (ls etc) using a different
|
||||
Roundtripper wrapper which wraps the transactions?
|
||||
* FIXME write tests for local file system
|
||||
* FIXME implement tests for single file operations in rclonetest
|
||||
* Need to make directory objects otherwise can't upload an empty directory
|
||||
* Or could upload empty directories only?
|
||||
|
||||
More rsync features
|
||||
* include
|
||||
* exclude
|
||||
* max size
|
||||
* bandwidth limit
|
||||
* -c, --checksum skip based on checksum, not mod-time & size
|
||||
|
||||
Ideas for flags
|
||||
* --retries N flag which would make rclone retry a sync until successful or it tried N times.
|
||||
|
||||
Ideas
|
||||
* could do encryption - put IV into metadata?
|
||||
@@ -55,13 +48,6 @@ Ideas
|
||||
* control times sync (which is slow with some remotes) with -a --archive flag?
|
||||
* Copy a glob pattern - could do with LimitedFs
|
||||
|
||||
s3
|
||||
* Can maybe set last modified?
|
||||
* https://forums.aws.amazon.com/message.jspa?messageID=214062
|
||||
* Otherwise can set metadata
|
||||
* Returns etag and last modified in bucket list
|
||||
|
||||
Bugs
|
||||
* Non verbose - not sure number transferred got counted up? CHECK
|
||||
* When doing copy it recurses the whole of the destination FS which isn't necessary
|
||||
|
||||
|
||||
62
rclone.go
62
rclone.go
@@ -28,8 +28,9 @@ import (
|
||||
var (
|
||||
// Flags
|
||||
cpuprofile = pflag.StringP("cpuprofile", "", "", "Write cpu profile to file")
|
||||
statsInterval = pflag.DurationP("stats", "", time.Minute*1, "Interval to print stats")
|
||||
statsInterval = pflag.DurationP("stats", "", time.Minute*1, "Interval to print stats (0 to disable)")
|
||||
version = pflag.BoolP("version", "V", false, "Print the version number")
|
||||
logFile = pflag.StringP("log-file", "", "", "Log everything to this file")
|
||||
)
|
||||
|
||||
type Command struct {
|
||||
@@ -58,10 +59,10 @@ func (cmd *Command) checkArgs(args []string) {
|
||||
var Commands = []Command{
|
||||
{
|
||||
Name: "copy",
|
||||
ArgsHelp: "source://path dest://path",
|
||||
ArgsHelp: "source:path dest:path",
|
||||
Help: `
|
||||
Copy the source to the destination. Doesn't transfer
|
||||
unchanged files, testing first by modification time then by
|
||||
unchanged files, testing by size and modification time or
|
||||
MD5SUM. Doesn't delete files from the destination.`,
|
||||
Run: func(fdst, fsrc fs.Fs) {
|
||||
err := fs.Sync(fdst, fsrc, false)
|
||||
@@ -74,13 +75,13 @@ var Commands = []Command{
|
||||
},
|
||||
{
|
||||
Name: "sync",
|
||||
ArgsHelp: "source://path dest://path",
|
||||
ArgsHelp: "source:path dest:path",
|
||||
Help: `
|
||||
Sync the source to the destination. Doesn't transfer
|
||||
unchanged files, testing first by modification time then by
|
||||
MD5SUM. Deletes any files that exist in source that don't
|
||||
exist in destination. Since this can cause data loss, test
|
||||
first with the --dry-run flag.`,
|
||||
Sync the source to the destination, changing the destination
|
||||
only. Doesn't transfer unchanged files, testing by size and
|
||||
modification time or MD5SUM. Destination is updated to match
|
||||
source, including deleting files if necessary. Since this can
|
||||
cause data loss, test first with the --dry-run flag.`,
|
||||
Run: func(fdst, fsrc fs.Fs) {
|
||||
err := fs.Sync(fdst, fsrc, true)
|
||||
if err != nil {
|
||||
@@ -92,7 +93,7 @@ var Commands = []Command{
|
||||
},
|
||||
{
|
||||
Name: "ls",
|
||||
ArgsHelp: "[remote://path]",
|
||||
ArgsHelp: "[remote:path]",
|
||||
Help: `
|
||||
List all the objects in the the path with size and path.`,
|
||||
Run: func(fdst, fsrc fs.Fs) {
|
||||
@@ -106,7 +107,7 @@ var Commands = []Command{
|
||||
},
|
||||
{
|
||||
Name: "lsd",
|
||||
ArgsHelp: "[remote://path]",
|
||||
ArgsHelp: "[remote:path]",
|
||||
Help: `
|
||||
List all directories/containers/buckets in the the path.`,
|
||||
Run: func(fdst, fsrc fs.Fs) {
|
||||
@@ -120,9 +121,10 @@ var Commands = []Command{
|
||||
},
|
||||
{
|
||||
Name: "lsl",
|
||||
ArgsHelp: "[remote://path]",
|
||||
ArgsHelp: "[remote:path]",
|
||||
Help: `
|
||||
List all the objects in the the path with modification time, size and path.`,
|
||||
List all the objects in the the path with modification time,
|
||||
size and path.`,
|
||||
Run: func(fdst, fsrc fs.Fs) {
|
||||
err := fs.ListLong(fdst, os.Stdout)
|
||||
if err != nil {
|
||||
@@ -134,9 +136,10 @@ var Commands = []Command{
|
||||
},
|
||||
{
|
||||
Name: "md5sum",
|
||||
ArgsHelp: "[remote://path]",
|
||||
ArgsHelp: "[remote:path]",
|
||||
Help: `
|
||||
Produces an md5sum file for all the objects in the path.`,
|
||||
Produces an md5sum file for all the objects in the path. This
|
||||
is in the same format as the standard md5sum tool produces.`,
|
||||
Run: func(fdst, fsrc fs.Fs) {
|
||||
err := fs.Md5sum(fdst, os.Stdout)
|
||||
if err != nil {
|
||||
@@ -148,7 +151,7 @@ var Commands = []Command{
|
||||
},
|
||||
{
|
||||
Name: "mkdir",
|
||||
ArgsHelp: "remote://path",
|
||||
ArgsHelp: "remote:path",
|
||||
Help: `
|
||||
Make the path if it doesn't already exist`,
|
||||
Run: func(fdst, fsrc fs.Fs) {
|
||||
@@ -162,7 +165,7 @@ var Commands = []Command{
|
||||
},
|
||||
{
|
||||
Name: "rmdir",
|
||||
ArgsHelp: "remote://path",
|
||||
ArgsHelp: "remote:path",
|
||||
Help: `
|
||||
Remove the path. Note that you can't remove a path with
|
||||
objects in it, use purge for that.`,
|
||||
@@ -177,7 +180,7 @@ var Commands = []Command{
|
||||
},
|
||||
{
|
||||
Name: "purge",
|
||||
ArgsHelp: "remote://path",
|
||||
ArgsHelp: "remote:path",
|
||||
Help: `
|
||||
Remove the path and all of its contents.`,
|
||||
Run: func(fdst, fsrc fs.Fs) {
|
||||
@@ -191,7 +194,7 @@ var Commands = []Command{
|
||||
},
|
||||
{
|
||||
Name: "check",
|
||||
ArgsHelp: "source://path dest://path",
|
||||
ArgsHelp: "source:path dest:path",
|
||||
Help: `
|
||||
Checks the files in the source and destination match. It
|
||||
compares sizes and MD5SUMs and prints a report of files which
|
||||
@@ -240,7 +243,8 @@ Subcommands:
|
||||
fmt.Fprintf(os.Stderr, "Options:\n")
|
||||
pflag.PrintDefaults()
|
||||
fmt.Fprintf(os.Stderr, `
|
||||
It is only necessary to use a unique prefix of the subcommand, eg 'up' for 'upload'.
|
||||
It is only necessary to use a unique prefix of the subcommand, eg 'up'
|
||||
for 'upload'.
|
||||
`)
|
||||
}
|
||||
|
||||
@@ -323,6 +327,9 @@ func NewFs(remote string) fs.Fs {
|
||||
|
||||
// Print the stats every statsInterval
|
||||
func StartStats() {
|
||||
if *statsInterval <= 0 {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
ch := time.Tick(*statsInterval)
|
||||
for {
|
||||
@@ -340,6 +347,17 @@ func main() {
|
||||
}
|
||||
command, args := ParseCommand()
|
||||
|
||||
// Log file output
|
||||
if *logFile != "" {
|
||||
f, err := os.OpenFile(*logFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to open log file: %v", err)
|
||||
}
|
||||
f.Seek(0, os.SEEK_END)
|
||||
log.SetOutput(f)
|
||||
redirectStderr(f)
|
||||
}
|
||||
|
||||
// Make source and destination fs
|
||||
var fdst, fsrc fs.Fs
|
||||
if len(args) >= 1 {
|
||||
@@ -360,10 +378,10 @@ func main() {
|
||||
if command.Run != nil {
|
||||
command.Run(fdst, fsrc)
|
||||
if !command.NoStats {
|
||||
fmt.Println(fs.Stats)
|
||||
fmt.Fprintln(os.Stderr, fs.Stats)
|
||||
}
|
||||
if fs.Config.Verbose {
|
||||
log.Printf("*** Go routines at exit %d\n", runtime.NumGoroutine())
|
||||
fs.Debug(nil, "Go routines at exit %d\n", runtime.NumGoroutine())
|
||||
}
|
||||
if fs.Stats.Errored() {
|
||||
os.Exit(1)
|
||||
|
||||
15
redirect_stderr.go
Normal file
15
redirect_stderr.go
Normal file
@@ -0,0 +1,15 @@
|
||||
// Log the panic to the log file - for oses which can't do this
|
||||
|
||||
//+build !windows,!unix
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
)
|
||||
|
||||
// redirectStderr to the file passed in
|
||||
func redirectStderr(f *os.File) {
|
||||
log.Printf("Can't redirect stderr to file")
|
||||
}
|
||||
19
redirect_stderr_unix.go
Normal file
19
redirect_stderr_unix.go
Normal file
@@ -0,0 +1,19 @@
|
||||
// Log the panic under unix to the log file
|
||||
|
||||
//+build unix
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// redirectStderr to the file passed in
|
||||
func redirectStderr(f *os.File) {
|
||||
err := syscall.Dup2(int(f.Fd()), int(os.Stderr.Fd()))
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to redirect stderr to file: %v", err)
|
||||
}
|
||||
}
|
||||
39
redirect_stderr_windows.go
Normal file
39
redirect_stderr_windows.go
Normal file
@@ -0,0 +1,39 @@
|
||||
// Log the panic under windows to the log file
|
||||
//
|
||||
// Code from minix, via
|
||||
//
|
||||
// http://play.golang.org/p/kLtct7lSUg
|
||||
|
||||
//+build windows
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
var (
|
||||
kernel32 = syscall.MustLoadDLL("kernel32.dll")
|
||||
procSetStdHandle = kernel32.MustFindProc("SetStdHandle")
|
||||
)
|
||||
|
||||
func setStdHandle(stdhandle int32, handle syscall.Handle) error {
|
||||
r0, _, e1 := syscall.Syscall(procSetStdHandle.Addr(), 2, uintptr(stdhandle), uintptr(handle), 0)
|
||||
if r0 == 0 {
|
||||
if e1 != 0 {
|
||||
return error(e1)
|
||||
}
|
||||
return syscall.EINVAL
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// redirectStderr to the file passed in
|
||||
func redirectStderr(f *os.File) {
|
||||
err := setStdHandle(syscall.STD_ERROR_HANDLE, syscall.Handle(f.Fd()))
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to redirect stderr to file: %v", err)
|
||||
}
|
||||
}
|
||||
85
s3/s3.go
85
s3/s3.go
@@ -7,7 +7,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime"
|
||||
"net/http"
|
||||
"path"
|
||||
"regexp"
|
||||
@@ -101,7 +100,8 @@ func init() {
|
||||
|
||||
// Constants
|
||||
const (
|
||||
metaMtime = "X-Amz-Meta-Mtime" // the meta key to store mtime in
|
||||
metaMtime = "X-Amz-Meta-Mtime" // the meta key to store mtime in
|
||||
listChunkSize = 1024 // number of items to read at once
|
||||
)
|
||||
|
||||
// FsS3 represents a remote s3 server
|
||||
@@ -184,6 +184,7 @@ func s3Connection(name string) (*s3.S3, error) {
|
||||
}
|
||||
|
||||
c := s3.New(auth, region)
|
||||
c.Client = fs.Config.Client()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
@@ -267,36 +268,46 @@ func (f *FsS3) list(directories bool, fn func(string, *s3.Key)) {
|
||||
if directories {
|
||||
delimiter = "/"
|
||||
}
|
||||
// FIXME need to implement ALL loop
|
||||
objects, err := f.b.List(f.root, delimiter, "", 10000)
|
||||
if err != nil {
|
||||
fs.Stats.Error()
|
||||
fs.Log(f, "Couldn't read bucket %q: %s", f.bucket, err)
|
||||
} else {
|
||||
rootLength := len(f.root)
|
||||
if directories {
|
||||
for _, remote := range objects.CommonPrefixes {
|
||||
if !strings.HasPrefix(remote, f.root) {
|
||||
fs.Log(f, "Odd name received %q", remote)
|
||||
continue
|
||||
}
|
||||
remote := remote[rootLength:]
|
||||
if strings.HasSuffix(remote, "/") {
|
||||
remote = remote[:len(remote)-1]
|
||||
}
|
||||
fn(remote, &s3.Key{Key: remote})
|
||||
}
|
||||
marker := ""
|
||||
for {
|
||||
objects, err := f.b.List(f.root, delimiter, marker, listChunkSize)
|
||||
if err != nil {
|
||||
fs.Stats.Error()
|
||||
fs.Log(f, "Couldn't read bucket %q: %s", f.bucket, err)
|
||||
} else {
|
||||
for i := range objects.Contents {
|
||||
object := &objects.Contents[i]
|
||||
if !strings.HasPrefix(object.Key, f.root) {
|
||||
fs.Log(f, "Odd name received %q", object.Key)
|
||||
continue
|
||||
rootLength := len(f.root)
|
||||
if directories {
|
||||
for _, remote := range objects.CommonPrefixes {
|
||||
if !strings.HasPrefix(remote, f.root) {
|
||||
fs.Log(f, "Odd name received %q", remote)
|
||||
continue
|
||||
}
|
||||
remote := remote[rootLength:]
|
||||
if strings.HasSuffix(remote, "/") {
|
||||
remote = remote[:len(remote)-1]
|
||||
}
|
||||
fn(remote, &s3.Key{Key: remote})
|
||||
}
|
||||
} else {
|
||||
for i := range objects.Contents {
|
||||
object := &objects.Contents[i]
|
||||
if !strings.HasPrefix(object.Key, f.root) {
|
||||
fs.Log(f, "Odd name received %q", object.Key)
|
||||
continue
|
||||
}
|
||||
remote := object.Key[rootLength:]
|
||||
fn(remote, object)
|
||||
}
|
||||
remote := object.Key[rootLength:]
|
||||
fn(remote, object)
|
||||
}
|
||||
}
|
||||
if !objects.IsTruncated {
|
||||
break
|
||||
}
|
||||
// Use NextMarker if set, otherwise use last Key
|
||||
marker = objects.NextMarker
|
||||
if marker == "" {
|
||||
marker = objects.Contents[len(objects.Contents)-1].Key
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -409,9 +420,17 @@ func (o *FsObjectS3) Remote() string {
|
||||
return o.remote
|
||||
}
|
||||
|
||||
var matchMd5 = regexp.MustCompile(`^[0-9a-f]{32}$`)
|
||||
|
||||
// Md5sum returns the Md5sum of an object returning a lowercase hex string
|
||||
func (o *FsObjectS3) Md5sum() (string, error) {
|
||||
return strings.Trim(strings.ToLower(o.etag), `"`), nil
|
||||
etag := strings.Trim(strings.ToLower(o.etag), `"`)
|
||||
// Check the etag is a valid md5sum
|
||||
if !matchMd5.MatchString(etag) {
|
||||
fs.Debug(o, "Invalid md5sum (probably multipart uploaded) - ignoring: %q", etag)
|
||||
return "", nil
|
||||
}
|
||||
return etag, nil
|
||||
}
|
||||
|
||||
// Size returns the size of an object in bytes
|
||||
@@ -525,13 +544,7 @@ func (o *FsObjectS3) Update(in io.Reader, modTime time.Time, size int64) error {
|
||||
metaMtime: swift.TimeToFloatString(modTime),
|
||||
}
|
||||
|
||||
// Guess the content type
|
||||
contentType := mime.TypeByExtension(path.Ext(o.remote))
|
||||
if contentType == "" {
|
||||
contentType = "application/octet-stream"
|
||||
}
|
||||
|
||||
_, err := o.s3.b.PutReaderHeaders(o.s3.root+o.remote, in, size, contentType, o.s3.perm, headers)
|
||||
_, err := o.s3.b.PutReaderHeaders(o.s3.root+o.remote, in, size, fs.MimeType(o), o.s3.perm, headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -47,6 +47,9 @@ func init() {
|
||||
}, {
|
||||
Name: "tenant",
|
||||
Help: "Tenant name - optional",
|
||||
}, {
|
||||
Name: "region",
|
||||
Help: "Region name - optional",
|
||||
},
|
||||
// snet = flag.Bool("swift-snet", false, "Use internal service network") // FIXME not implemented
|
||||
},
|
||||
@@ -110,11 +113,15 @@ func swiftConnection(name string) (*swift.Connection, error) {
|
||||
return nil, errors.New("auth not found")
|
||||
}
|
||||
c := &swift.Connection{
|
||||
UserName: userName,
|
||||
ApiKey: apiKey,
|
||||
AuthUrl: authUrl,
|
||||
UserAgent: fs.UserAgent,
|
||||
Tenant: fs.ConfigFile.MustValue(name, "tenant"),
|
||||
UserName: userName,
|
||||
ApiKey: apiKey,
|
||||
AuthUrl: authUrl,
|
||||
UserAgent: fs.UserAgent,
|
||||
Tenant: fs.ConfigFile.MustValue(name, "tenant"),
|
||||
Region: fs.ConfigFile.MustValue(name, "region"),
|
||||
ConnectTimeout: 10 * fs.Config.ConnectTimeout, // Use the timeouts in the transport
|
||||
Timeout: 10 * fs.Config.Timeout, // Use the timeouts in the transport
|
||||
Transport: fs.Config.Transport(),
|
||||
}
|
||||
err := c.Authenticate()
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user