1
0
mirror of https://github.com/rclone/rclone.git synced 2025-12-15 15:53:41 +00:00

Compare commits

..

29 Commits
v1.09 ... v1.13

Author SHA1 Message Date
Nick Craig-Wood
88293a4b8a Version v1.13 2015-05-10 12:39:06 +01:00
Nick Craig-Wood
981104519e Revise documentation (especially sync) - fixes #39 2015-05-10 12:17:24 +01:00
Nick Craig-Wood
1d254a3674 Implement --timeout and --conntimeout - fixes #54
NB dropbox still to do
2015-05-10 11:29:55 +01:00
Nick Craig-Wood
f88d171afd s3: ignore etags from multipart uploads which aren't md5sums - fixes #56 2015-05-10 11:29:55 +01:00
Nick Craig-Wood
ba2091725e Version v1.12 2015-03-15 15:55:38 +00:00
Nick Craig-Wood
7c120b8bc5 drive: add --drive-chunk-size and --drive-upload-cutoff parameters 2015-03-15 15:27:55 +00:00
Nick Craig-Wood
5cc5429f99 drive: switch to insert from update when a failed copy deletes the upload 2015-03-15 15:27:55 +00:00
Nick Craig-Wood
09d71239b6 Make file size render more neatly and prevent from being < 0 2015-03-15 15:27:55 +00:00
Nick Craig-Wood
c643e4585e core: Log duplicate files if they are detected 2015-03-15 15:27:55 +00:00
Nick Craig-Wood
873db29391 Log all objects more informatively 2015-03-15 15:27:55 +00:00
Nick Craig-Wood
81a933ae38 drive: Use chunked upload for files - fixes #33 2015-03-15 15:27:55 +00:00
Nick Craig-Wood
ecb3c7bcef drive, googlecloudstorage: remove SeekWrapper after googleapi fix 2015-03-04 20:47:59 +00:00
Nick Craig-Wood
80000b904c Version v1.11 2015-03-04 17:59:31 +00:00
Nick Craig-Wood
c47c9cd440 swift: add region parameter - fixes #38 2015-03-04 17:09:53 +00:00
Nick Craig-Wood
b4a0941d4c In remote paths, change native directory separators to / - fixes #37 2015-03-02 17:04:34 +00:00
Nick Craig-Wood
c03d6a1ec3 drive: fix crash on failed to update remote mtime - fixes #36 2015-03-02 09:25:33 +00:00
Nick Craig-Wood
46d39ebaf7 Factor Mime Type guessing into fs.MimeType() 2015-03-02 09:21:15 +00:00
Nick Craig-Wood
fe68737268 Fix niggles found by go vet 2015-02-28 15:35:54 +00:00
Nick Craig-Wood
2360bf907a Add synchronization to list output to stop corruptions - fixes #29 2015-02-28 15:30:40 +00:00
Nick Craig-Wood
aa093e991e Ensure all stats/log messages to go stderr - fixes #30 2015-02-28 14:39:00 +00:00
Nick Craig-Wood
a5974999eb Update docs - fixes #32 2015-02-28 14:15:47 +00:00
Nick Craig-Wood
24a6ff54c2 Add --log-file flag to log everything (including panics) to file 2015-02-28 08:10:20 +00:00
Nick Craig-Wood
e89ea3360e Make it possible to disable stats printing with --stats=0 2015-02-27 15:22:26 +00:00
Nick Craig-Wood
85f8552c4d Tidy logging 2015-02-27 15:22:05 +00:00
Nick Craig-Wood
a287e3ced7 Implement --bwlimit to limit data transfer bandwidth 2015-02-27 15:03:47 +00:00
Nick Craig-Wood
8e4d8d13b8 drive: rename internal api 2015-02-20 09:51:07 +00:00
Nick Craig-Wood
cf208ad21b Version v1.10 2015-02-12 18:00:20 +00:00
Nick Craig-Wood
0faed16899 s3: list an unlimited number of items - fixes #22 2015-02-10 17:58:29 +00:00
Nick Craig-Wood
8d1c0ad07c Fix config loop - fixes #25 2015-02-10 16:48:04 +00:00
25 changed files with 956 additions and 316 deletions

View File

@@ -75,28 +75,34 @@ Subcommands
rclone copy source:path dest:path
Copy the source to the destination. Doesn't transfer
unchanged files, testing first by modification time then by
unchanged files, testing by size and modification time or
MD5SUM. Doesn't delete files from the destination.
rclone sync source:path dest:path
Sync the source to the destination. Doesn't transfer
unchanged files, testing first by modification time then by
MD5SUM. Deletes any files that exist in source that don't
exist in destination. Since this can cause data loss, test
first with the `--dry-run` flag.
Sync the source to the destination, changing the destination
only. Doesn't transfer unchanged files, testing by size and
modification time or MD5SUM. Destination is updated to match
source, including deleting files if necessary. Since this can
cause data loss, test first with the `--dry-run` flag.
rclone ls [remote:path]
List all the objects in the the path with sizes.
rclone lsl [remote:path]
List all the objects in the the path with sizes and timestamps.
List all the objects in the the path with size and path.
rclone lsd [remote:path]
List all directories/objects/buckets in the the path.
List all directories/containers/buckets in the the path.
rclone lsl [remote:path]
List all the objects in the the path with modification time,
size and path.
rclone md5sum [remote:path]
Produces an md5sum file for all the objects in the path. This
is in the same format as the standard md5sum tool produces.
rclone mkdir remote:path
@@ -117,22 +123,30 @@ Checks the files in the source and destination match. It
compares sizes and MD5SUMs and prints a report of files which
don't match. It doesn't alter the source or destination.
rclone md5sum remote:path
rclone config
Produces an md5sum file for all the objects in the path. This is in
the same format as the standard md5sum tool produces.
Enter an interactive configuration session.
rclone help
This help.
General options:
```
--bwlimit=0: Bandwidth limit in kBytes/s, or use suffix k|M|G
--checkers=8: Number of checkers to run in parallel.
--config="~/.rclone.conf": Config file.
--contimeout=1m0s: Connect timeout
-n, --dry-run=false: Do a trial run with no permanent changes
--log-file="": Log everything to this file
--modify-window=1ns: Max time diff to be considered the same
-q, --quiet=false: Print as little stuff as possible
--stats=1m0s: Interval to print stats
--stats=1m0s: Interval to print stats (0 to disable)
--timeout=5m0s: IO idle timeout
--transfers=4: Number of file transfers to run in parallel.
-v, --verbose=false: Print lots more stuff
-V, --version=false: Print the version number
```
Developer options:
@@ -161,12 +175,7 @@ So to copy a local directory to a swift container called backup:
rclone sync /home/source swift:backup
The modified time is stored as metadata on the object as
`X-Object-Meta-Mtime` as floating point since the epoch.
This is a defacto standard (used in the official python-swiftclient
amongst others) for storing the modification time (as read using
os.Stat) for an object.
For more help see the [online docs on Openstack Swift](http://rclone.org/swift).
Amazon S3
---------
@@ -178,8 +187,7 @@ So to copy a local directory to a s3 container called backup
rclone sync /home/source s3:backup
The modified time is stored as metadata on the object as
`X-Amz-Meta-Mtime` as floating point since the epoch.
For more help see the [online docs on Amazon S3](http://rclone.org/s3).
Google drive
------------
@@ -194,7 +202,7 @@ To copy a local directory to a drive directory called backup
rclone copy /home/source remote:backup
Google drive stores modification times accurate to 1 ms natively.
For more help see the [online docs on Google Drive](http://rclone.org/drive).
Dropbox
-------
@@ -209,10 +217,7 @@ To copy a local directory to a drive directory called backup
rclone copy /home/source dropbox:backup
Md5sums and timestamps in RFC3339 format accurate to 1ns are stored in
a Dropbox datastore called "rclone". Dropbox datastores are limited
to 100,000 rows so this is the maximum number of files rclone can
manage on Dropbox.
For more help see the [online docs on Dropbox](http://rclone.org/dropbox).
Google Cloud Storage
--------------------
@@ -228,9 +233,7 @@ To copy a local directory to a google cloud storage directory called backup
rclone copy /home/source remote:backup
Google google cloud storage stores md5sums natively and rclone stores
modification times as metadata on the object, under the "mtime" key in
RFC3339 format accurate to 1ns.
For more help see the [online docs on Google Cloud Storage](http://rclone.org/googlecloudstorage/).
Single file copies
------------------
@@ -259,6 +262,27 @@ Bugs
Changelog
---------
* v1.13 - 2015-05-10
* Revise documentation (especially sync)
* Implement --timeout and --conntimeout
* s3: ignore etags from multipart uploads which aren't md5sums
* v1.12 - 2015-03-15
* drive: Use chunked upload for files above a certain size
* drive: add --drive-chunk-size and --drive-upload-cutoff parameters
* drive: switch to insert from update when a failed copy deletes the upload
* core: Log duplicate files if they are detected
* v1.11 - 2015-03-04
* swift: add region parameter
* drive: fix crash on failed to update remote mtime
* In remote paths, change native directory separators to /
* Add synchronization to ls/lsl/lsd output to stop corruptions
* Ensure all stats/log messages to go stderr
* Add --log-file flag to log everything (including panics) to file
* Make it possible to disable stats printing with --stats=0
* Implement --bwlimit to limit data transfer bandwidth
* v1.10 - 2015-02-12
* s3: list an unlimited number of items
* Fix getting stuck in the configurator
* v1.09 - 2015-02-07
* windows: Stop drive letters (eg C:) getting mixed up with remotes (eg drive:)
* local: Fix directory separators on Windows

View File

@@ -58,28 +58,34 @@ Subcommands
rclone copy source:path dest:path
Copy the source to the destination. Doesn't transfer
unchanged files, testing first by modification time then by
unchanged files, testing by size and modification time or
MD5SUM. Doesn't delete files from the destination.
rclone sync source:path dest:path
Sync the source to the destination. Doesn't transfer
unchanged files, testing first by modification time then by
MD5SUM. Deletes any files that exist in source that don't
exist in destination. Since this can cause data loss, test
first with the -dry-run flag.
Sync the source to the destination, changing the destination
only. Doesn't transfer unchanged files, testing by size and
modification time or MD5SUM. Destination is updated to match
source, including deleting files if necessary. Since this can
cause data loss, test first with the `--dry-run` flag.
rclone ls [remote:path]
List all the objects in the the path with sizes.
rclone lsl [remote:path]
List all the objects in the the path with sizes and timestamps.
List all the objects in the the path with size and path.
rclone lsd [remote:path]
List all directories/objects/buckets in the the path.
List all directories/containers/buckets in the the path.
rclone lsl [remote:path]
List all the objects in the the path with modification time,
size and path.
rclone md5sum [remote:path]
Produces an md5sum file for all the objects in the path. This
is in the same format as the standard md5sum tool produces.
rclone mkdir remote:path
@@ -100,21 +106,28 @@ Checks the files in the source and destination match. It
compares sizes and MD5SUMs and prints a report of files which
don't match. It doesn't alter the source or destination.
rclone md5sum remote:path
rclone config
Produces an md5sum file for all the objects in the path. This is in
the same format as the standard md5sum tool produces.
General options:
Enter an interactive configuration session.
rclone help
This help.
```
--bwlimit=0: Bandwidth limit in kBytes/s, or use suffix k|M|G
--checkers=8: Number of checkers to run in parallel.
--transfers=4: Number of file transfers to run in parallel.
--config="~/.rclone.conf": Config file.
--contimeout=1m0s: Connect timeout
-n, --dry-run=false: Do a trial run with no permanent changes
--log-file="": Log everything to this file
--modify-window=1ns: Max time diff to be considered the same
-q, --quiet=false: Print as little stuff as possible
--stats=1m0s: Interval to print stats
--stats=1m0s: Interval to print stats (0 to disable)
--timeout=5m0s: IO idle timeout
--transfers=4: Number of file transfers to run in parallel.
-v, --verbose=false: Print lots more stuff
-V, --version=false: Print the version number
```
Developer options:

View File

@@ -2,34 +2,34 @@
title: "Rclone downloads"
description: "Download rclone binaries for your OS."
type: page
date: "2015-02-07"
date: "2015-05-10"
---
Rclone Download v1.09
Rclone Download v1.13
=====================
* Windows
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.09-windows-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.09-windows-amd64.zip)
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-windows-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-windows-amd64.zip)
* OSX
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.09-osx-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.09-osx-amd64.zip)
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-osx-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-osx-amd64.zip)
* Linux
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.09-linux-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.09-linux-amd64.zip)
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.09-linux-arm.zip)
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-linux-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-linux-amd64.zip)
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.13-linux-arm.zip)
* FreeBSD
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.09-freebsd-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.09-freebsd-amd64.zip)
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.09-freebsd-arm.zip)
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-freebsd-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-freebsd-amd64.zip)
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.13-freebsd-arm.zip)
* NetBSD
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.09-netbsd-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.09-netbsd-amd64.zip)
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.09-netbsd-arm.zip)
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-netbsd-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-netbsd-amd64.zip)
* [ARM - 32 Bit](http://downloads.rclone.org/rclone-v1.13-netbsd-arm.zip)
* OpenBSD
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.09-openbsd-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.09-openbsd-amd64.zip)
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-openbsd-386.zip)
* [AMD64 - 64 Bit](http://downloads.rclone.org/rclone-v1.13-openbsd-amd64.zip)
* Plan 9
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.09-plan9-386.zip)
* [386 - 32 Bit](http://downloads.rclone.org/rclone-v1.13-plan9-386.zip)
Older downloads can be found [here](http://downloads.rclone.org/)

View File

@@ -1,7 +1,7 @@
---
title: "Google drive"
description: "Rclone docs for Google drive"
date: "2014-04-26"
date: "2015-05-10"
---
<i class="fa fa-google"></i> Google Drive
@@ -73,3 +73,16 @@ Modified time
-------------
Google drive stores modification times accurate to 1 ms.
Revisions
---------
Google drive stores revisions of files. When you upload a change to
an existing file to google drive using rclone it will create a new
revision of that file.
Revisions follow the standard google policy which at time of writing
was
* They are deleted after 30 days or 100 revisions (whatever comes first).
* They do not count towards a user storage quota.

View File

@@ -10,10 +10,7 @@ package drive
import (
"fmt"
"io"
"log"
"mime"
"net/http"
"path"
"strings"
"sync"
"time"
@@ -42,6 +39,10 @@ const (
var (
// Flags
driveFullList = pflag.BoolP("drive-full-list", "", true, "Use a full listing for directory list. More data but usually quicker.")
// chunkSize is the size of the chunks created during a resumable upload and should be a power of two.
// 1<<18 is the minimum size supported by the Google uploader, and there is no maximum.
chunkSize = fs.SizeSuffix(256 * 1024)
driveUploadCutoff = chunkSize
// Description of how to auth for this app
driveAuth = &googleauth.Auth{
Scope: "https://www.googleapis.com/auth/drive",
@@ -66,6 +67,8 @@ func init() {
Help: "Google Application Client Secret - leave blank to use rclone's.",
}},
})
pflag.VarP(&driveUploadCutoff, "drive-upload-cutoff", "", "Cutoff for switching to chunked upload")
pflag.VarP(&chunkSize, "drive-chunk-size", "", "Upload chunk size. Must a power of 2 >= 256k.")
}
// FsDrive represents a remote drive server
@@ -77,7 +80,7 @@ type FsDrive struct {
rootId string // Id of the root directory
foundRoot bool // Whether we have found the root or not
findRootLock sync.Mutex // Protect findRoot from concurrent use
dirCache dirCache // Map of directory path to directory id
dirCache *dirCache // Map of directory path to directory id
findDirLock sync.Mutex // Protect findDir from concurrent use
pacer chan struct{} // To pace the operations
sleepTime time.Duration // Time to sleep for each transaction
@@ -102,8 +105,8 @@ type dirCache struct {
}
// Make a new locked map
func newDirCache() dirCache {
d := dirCache{}
func newDirCache() *dirCache {
d := &dirCache{}
d.Flush()
return d
}
@@ -147,8 +150,12 @@ func (f *FsDrive) String() string {
return fmt.Sprintf("Google drive root '%s'", f.root)
}
// Wait for the pace
func (f *FsDrive) paceWait() {
// Start a call to the drive API
//
// This must be called as a pair with endCall
//
// This waits for the pacer token
func (f *FsDrive) beginCall() {
// pacer starts with a token in and whenever we take one out
// XXX ms later we put another in. We could do this with a
// Ticker more accurately, but then we'd have to work out how
@@ -163,12 +170,14 @@ func (f *FsDrive) paceWait() {
}(f.sleepTime)
}
// End a call to the drive API
//
// Refresh the pace given an error that was returned. It returns a
// boolean as to whether the operation should be retried.
//
// See https://developers.google.com/drive/web/handle-errors
// http://stackoverflow.com/questions/18529524/403-rate-limit-after-only-1-insert-per-second
func (f *FsDrive) paceRefresh(err error) bool {
func (f *FsDrive) endCall(err error) bool {
again := false
oldSleepTime := f.sleepTime
if err == nil {
@@ -180,24 +189,41 @@ func (f *FsDrive) paceRefresh(err error) bool {
fs.Debug(f, "Reducing sleep to %v", f.sleepTime)
}
} else {
fs.Debug(f, "Error recived: %v", err)
if gerr, ok := err.(*googleapi.Error); ok {
if len(gerr.Errors) > 0 {
fs.Debug(f, "Error recived: %T %#v", err, err)
// Check for net error Timeout()
if x, ok := err.(interface {
Timeout() bool
}); ok && x.Timeout() {
again = true
}
// Check for net error Temporary()
if x, ok := err.(interface {
Temporary() bool
}); ok && x.Temporary() {
again = true
}
switch gerr := err.(type) {
case *googleapi.Error:
if gerr.Code >= 500 && gerr.Code < 600 {
// All 5xx errors should be retried
again = true
} else if len(gerr.Errors) > 0 {
reason := gerr.Errors[0].Reason
if reason == "rateLimitExceeded" || reason == "userRateLimitExceeded" {
f.sleepTime *= 2
if f.sleepTime > maxSleep {
f.sleepTime = maxSleep
}
if f.sleepTime != oldSleepTime {
fs.Debug(f, "Rate limited, increasing sleep to %v", f.sleepTime)
}
again = true
}
}
}
}
if again {
f.sleepTime *= 2
if f.sleepTime > maxSleep {
f.sleepTime = maxSleep
}
if f.sleepTime != oldSleepTime {
fs.Debug(f, "Rate limited, increasing sleep to %v", f.sleepTime)
}
}
return again
}
@@ -205,11 +231,11 @@ func (f *FsDrive) paceRefresh(err error) bool {
// on 403 rate limit exceeded
//
// This calls fn, expecting it to place its error in perr
func (f *FsDrive) pace(perr *error, fn func()) {
func (f *FsDrive) call(perr *error, fn func()) {
for {
f.paceWait()
f.beginCall()
fn()
if !f.paceRefresh(*perr) {
if !f.endCall(*perr) {
break
}
}
@@ -253,7 +279,7 @@ func (f *FsDrive) listAll(dirId string, title string, directoriesOnly bool, file
OUTER:
for {
var files *drive.FileList
f.pace(&err, func() {
f.call(&err, func() {
files, err = list.Do()
})
if err != nil {
@@ -273,8 +299,27 @@ OUTER:
return
}
// Returns true of x is a power of 2 or zero
func isPowerOfTwo(x int64) bool {
switch {
case x == 0:
return true
case x < 0:
return false
default:
return (x & (x - 1)) == 0
}
}
// NewFs contstructs an FsDrive from the path, container:path
func NewFs(name, path string) (fs.Fs, error) {
if !isPowerOfTwo(int64(chunkSize)) {
return nil, fmt.Errorf("drive: chunk size %v isn't a power of two", chunkSize)
}
if chunkSize < 256*1024 {
return nil, fmt.Errorf("drive: chunk size can't be less than 256k - was %v", chunkSize)
}
t, err := driveAuth.NewTransport(name)
if err != nil {
return nil, err
@@ -303,7 +348,7 @@ func NewFs(name, path string) (fs.Fs, error) {
}
// Read About so we know the root path
f.pace(&err, func() {
f.call(&err, func() {
f.about, err = f.svc.About.Get().Do()
})
if err != nil {
@@ -572,7 +617,7 @@ func (f *FsDrive) _findDir(path string, create bool) (pathId string, err error)
Parents: []*drive.ParentReference{{Id: pathId}},
}
var info *drive.File
f.pace(&err, func() {
f.call(&err, func() {
info, err = f.svc.Files.Insert(createInfo).Do()
})
if err != nil {
@@ -636,7 +681,7 @@ func (f *FsDrive) List() fs.ObjectsChan {
err := f.findRoot(false)
if err != nil {
fs.Stats.Error()
log.Printf("Couldn't find root: %s", err)
fs.Log(f, "Couldn't find root: %s", err)
} else {
if f.root == "" && *driveFullList {
err = f.listDirFull(f.rootId, "", out)
@@ -645,7 +690,7 @@ func (f *FsDrive) List() fs.ObjectsChan {
}
if err != nil {
fs.Stats.Error()
log.Printf("List failed: %s", err)
fs.Log(f, "List failed: %s", err)
}
}
}()
@@ -660,7 +705,7 @@ func (f *FsDrive) ListDir() fs.DirChan {
err := f.findRoot(false)
if err != nil {
fs.Stats.Error()
log.Printf("Couldn't find root: %s", err)
fs.Log(f, "Couldn't find root: %s", err)
} else {
_, err := f.listAll(f.rootId, "", true, false, func(item *drive.File) bool {
dir := &fs.Dir{
@@ -674,7 +719,7 @@ func (f *FsDrive) ListDir() fs.DirChan {
})
if err != nil {
fs.Stats.Error()
log.Printf("ListDir failed: %s", err)
fs.Log(f, "ListDir failed: %s", err)
}
}
}()
@@ -700,33 +745,33 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64
return o, fmt.Errorf("Couldn't find or make directory: %s", err)
}
// Guess the mime type
mimeType := mime.TypeByExtension(path.Ext(o.remote))
if mimeType == "" {
mimeType = "application/octet-stream"
}
modifiedDate := modTime.Format(timeFormatOut)
// Define the metadata for the file we are going to create.
createInfo := &drive.File{
Title: leaf,
Description: leaf,
Parents: []*drive.ParentReference{{Id: directoryId}},
MimeType: mimeType,
ModifiedDate: modifiedDate,
MimeType: fs.MimeType(o),
ModifiedDate: modTime.Format(timeFormatOut),
}
// Make the API request to upload metadata and file data.
in = &fs.SeekWrapper{In: in, Size: size}
var info *drive.File
// Don't retry, return a retry error instead
f.paceWait()
info, err = f.svc.Files.Insert(createInfo).Media(in).Do()
if f.paceRefresh(err) {
return o, fs.RetryErrorf("Upload failed - retry: %s", err)
}
if err != nil {
return o, fmt.Errorf("Upload failed: %s", err)
if size == 0 || size < int64(driveUploadCutoff) {
// Make the API request to upload metadata and file data.
// Don't retry, return a retry error instead
f.beginCall()
info, err = f.svc.Files.Insert(createInfo).Media(in).Do()
if f.endCall(err) {
return o, fs.RetryErrorf("Upload failed - retry: %s", err)
}
if err != nil {
return o, fmt.Errorf("Upload failed: %s", err)
}
} else {
// Upload the file in chunks
info, err = f.Upload(in, size, createInfo.MimeType, createInfo, remote)
if err != nil {
return o, err
}
}
o.setMetaData(info)
return o, nil
@@ -746,7 +791,7 @@ func (f *FsDrive) Rmdir() error {
return err
}
var children *drive.ChildList
f.pace(&err, func() {
f.call(&err, func() {
children, err = f.svc.Children.List(f.rootId).MaxResults(10).Do()
})
if err != nil {
@@ -757,7 +802,7 @@ func (f *FsDrive) Rmdir() error {
}
// Delete the directory if it isn't the root
if f.root != "" {
f.pace(&err, func() {
f.call(&err, func() {
err = f.svc.Files.Delete(f.rootId).Do()
})
if err != nil {
@@ -786,7 +831,7 @@ func (f *FsDrive) Purge() error {
if err != nil {
return err
}
f.pace(&err, func() {
f.call(&err, func() {
err = f.svc.Files.Delete(f.rootId).Do()
})
f.resetRoot()
@@ -898,12 +943,13 @@ func (o *FsObjectDrive) SetModTime(modTime time.Time) {
}
// Set modified date
var info *drive.File
o.drive.pace(&err, func() {
o.drive.call(&err, func() {
info, err = o.drive.svc.Files.Update(o.id, updateInfo).SetModifiedDate(true).Do()
})
if err != nil {
fs.Stats.Error()
fs.Log(o, "Failed to update remote mtime: %s", err)
return
}
// Update info from read data
o.setMetaData(info)
@@ -922,7 +968,7 @@ func (o *FsObjectDrive) Open() (in io.ReadCloser, err error) {
}
req.Header.Set("User-Agent", fs.UserAgent)
var res *http.Response
o.drive.pace(&err, func() {
o.drive.call(&err, func() {
res, err = o.drive.client.Do(req)
})
if err != nil {
@@ -947,17 +993,24 @@ func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) erro
}
// Make the API request to upload metadata and file data.
in = &fs.SeekWrapper{In: in, Size: size}
var err error
var info *drive.File
// Don't retry, return a retry error instead
o.drive.paceWait()
info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do()
if o.drive.paceRefresh(err) {
return fs.RetryErrorf("Update failed - retry: %s", err)
}
if err != nil {
return fmt.Errorf("Update failed: %s", err)
if size == 0 || size < int64(driveUploadCutoff) {
// Don't retry, return a retry error instead
o.drive.beginCall()
info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do()
if o.drive.endCall(err) {
return fs.RetryErrorf("Update failed - retry: %s", err)
}
if err != nil {
return fmt.Errorf("Update failed: %s", err)
}
} else {
// Upload the file in chunks
info, err = o.drive.Upload(in, size, fs.MimeType(o), updateInfo, o.remote)
if err != nil {
return err
}
}
o.setMetaData(info)
return nil
@@ -966,7 +1019,7 @@ func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) erro
// Remove an object
func (o *FsObjectDrive) Remove() error {
var err error
o.drive.pace(&err, func() {
o.drive.call(&err, func() {
err = o.drive.svc.Files.Delete(o.id).Do()
})
return err

246
drive/upload.go Normal file
View File

@@ -0,0 +1,246 @@
// Upload for drive
//
// Docs
// Resumable upload: https://developers.google.com/drive/web/manage-uploads#resumable
// Best practices: https://developers.google.com/drive/web/manage-uploads#best-practices
// Files insert: https://developers.google.com/drive/v2/reference/files/insert
// Files update: https://developers.google.com/drive/v2/reference/files/update
//
// This contains code adapted from google.golang.org/api (C) the GO AUTHORS
package drive
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"regexp"
"strconv"
"github.com/ncw/rclone/fs"
"google.golang.org/api/drive/v2"
"google.golang.org/api/googleapi"
)
const (
// statusResumeIncomplete is the code returned by the Google uploader when the transfer is not yet complete.
statusResumeIncomplete = 308
// Number of times to try each chunk
maxTries = 10
)
// resumableUpload is used by the generated APIs to provide resumable uploads.
// It is not used by developers directly.
type resumableUpload struct {
f *FsDrive
remote string
// URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable".
URI string
// Media is the object being uploaded.
Media io.Reader
// MediaType defines the media type, e.g. "image/jpeg".
MediaType string
// ContentLength is the full size of the object being uploaded.
ContentLength int64
// Return value
ret *drive.File
}
// Upload the io.Reader in of size bytes with contentType and info
func (f *FsDrive) Upload(in io.Reader, size int64, contentType string, info *drive.File, remote string) (*drive.File, error) {
fileId := info.Id
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(info)
if err != nil {
return nil, err
}
params := make(url.Values)
params.Set("alt", "json")
params.Set("uploadType", "resumable")
urls := "https://www.googleapis.com/upload/drive/v2/files"
method := "POST"
if fileId != "" {
params.Set("setModifiedDate", "true")
urls += "/{fileId}"
method = "PUT"
}
urls += "?" + params.Encode()
req, _ := http.NewRequest(method, urls, body)
googleapi.Expand(req.URL, map[string]string{
"fileId": fileId,
})
req.Header.Set("Content-Type", "application/json; charset=UTF-8")
req.Header.Set("X-Upload-Content-Type", contentType)
req.Header.Set("X-Upload-Content-Length", fmt.Sprintf("%v", size))
req.Header.Set("User-Agent", fs.UserAgent)
var res *http.Response
f.call(&err, func() {
res, err = f.client.Do(req)
if err == nil {
defer googleapi.CloseBody(res)
err = googleapi.CheckResponse(res)
}
})
if err != nil {
return nil, err
}
loc := res.Header.Get("Location")
rx := &resumableUpload{
f: f,
remote: remote,
URI: loc,
Media: in,
MediaType: contentType,
ContentLength: size,
}
return rx.Upload()
}
// Make an http.Request for the range passed in
func (rx *resumableUpload) makeRequest(start int64, body []byte) *http.Request {
reqSize := int64(len(body))
req, _ := http.NewRequest("POST", rx.URI, bytes.NewBuffer(body))
req.ContentLength = reqSize
if reqSize != 0 {
req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", start, start+reqSize-1, rx.ContentLength))
} else {
req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", rx.ContentLength))
}
req.Header.Set("Content-Type", rx.MediaType)
req.Header.Set("User-Agent", fs.UserAgent)
return req
}
// rangeRE matches the transfer status response from the server. $1 is
// the last byte index uploaded.
var rangeRE = regexp.MustCompile(`^0\-(\d+)$`)
// Query drive for the amount transferred so far
//
// If error is nil, then start should be valid
func (rx *resumableUpload) transferStatus() (start int64, err error) {
req := rx.makeRequest(0, nil)
res, err := rx.f.client.Do(req)
if err != nil {
return 0, err
}
defer googleapi.CloseBody(res)
if res.StatusCode == http.StatusCreated || res.StatusCode == http.StatusOK {
return rx.ContentLength, nil
}
if res.StatusCode != statusResumeIncomplete {
err = googleapi.CheckResponse(res)
if err != nil {
return 0, err
}
return 0, fmt.Errorf("unexpected http return code %v", res.StatusCode)
}
Range := res.Header.Get("Range")
if m := rangeRE.FindStringSubmatch(Range); len(m) == 2 {
start, err = strconv.ParseInt(m[1], 10, 64)
if err == nil {
return start, nil
}
}
return 0, fmt.Errorf("unable to parse range %q", Range)
}
// Transfer a chunk - caller must call googleapi.CloseBody(res) if err == nil || res != nil
func (rx *resumableUpload) transferChunk(start int64, body []byte) (int, error) {
req := rx.makeRequest(start, body)
res, err := rx.f.client.Do(req)
if err != nil {
return 599, err
}
defer googleapi.CloseBody(res)
if res.StatusCode == statusResumeIncomplete {
return res.StatusCode, nil
}
err = googleapi.CheckResponse(res)
if err != nil {
return res.StatusCode, err
}
// When the entire file upload is complete, the server
// responds with an HTTP 201 Created along with any metadata
// associated with this resource. If this request had been
// updating an existing entity rather than creating a new one,
// the HTTP response code for a completed upload would have
// been 200 OK.
//
// So parse the response out of the body. We aren't expecting
// any other 2xx codes, so we parse it unconditionaly on
// StatusCode
if err = json.NewDecoder(res.Body).Decode(&rx.ret); err != nil {
return 598, err
}
return res.StatusCode, nil
}
// Upload uploads the chunks from the input
// It retries each chunk maxTries times (with a pause of uploadPause between attempts).
func (rx *resumableUpload) Upload() (*drive.File, error) {
start := int64(0)
buf := make([]byte, chunkSize)
var StatusCode int
for start < rx.ContentLength {
reqSize := rx.ContentLength - start
if reqSize >= int64(chunkSize) {
reqSize = int64(chunkSize)
} else {
buf = buf[:reqSize]
}
// Read the chunk
_, err := io.ReadFull(rx.Media, buf)
if err != nil {
return nil, err
}
// Transfer the chunk
for try := 1; try <= maxTries; try++ {
fs.Debug(rx.remote, "Sending chunk %d length %d, %d/%d", start, reqSize, try, maxTries)
rx.f.beginCall()
StatusCode, err = rx.transferChunk(start, buf)
rx.f.endCall(err)
if StatusCode == statusResumeIncomplete || StatusCode == http.StatusCreated || StatusCode == http.StatusOK {
goto success
}
fs.Debug(rx.remote, "Retrying chunk %d/%d, code=%d, err=%v", try, maxTries, StatusCode, err)
}
fs.Debug(rx.remote, "Failed to send chunk")
return nil, fs.RetryErrorf("Chunk upload failed - retry: code=%d, err=%v", StatusCode, err)
success:
start += reqSize
}
// Resume or retry uploads that fail due to connection interruptions or
// any 5xx errors, including:
//
// 500 Internal Server Error
// 502 Bad Gateway
// 503 Service Unavailable
// 504 Gateway Timeout
//
// Use an exponential backoff strategy if any 5xx server error is
// returned when resuming or retrying upload requests. These errors can
// occur if a server is getting overloaded. Exponential backoff can help
// alleviate these kinds of problems during periods of high volume of
// requests or heavy network traffic. Other kinds of requests should not
// be handled by exponential backoff but you can still retry a number of
// them. When retrying these requests, limit the number of times you
// retry them. For example your code could limit to ten retries or less
// before reporting an error.
//
// Handle 404 Not Found errors when doing resumable uploads by starting
// the entire upload over from the beginning.
if rx.ret == nil {
return nil, fs.RetryErrorf("Incomplete upload - retry, last error %d", StatusCode)
}
return rx.ret, nil
}

View File

@@ -17,6 +17,20 @@ This is a JSON decode error - from Update / UploadByChunk
- Caused by 500 error from dropbox
- See https://github.com/stacktic/dropbox/issues/1
- Possibly confusing dropbox with excess concurrency?
FIXME implement timeouts - need to get "github.com/stacktic/dropbox"
and hence "golang.org/x/oauth2" which uses DefaultTransport unless it
is set in the context passed into .Client()
func (db *Dropbox) client() *http.Client {
return db.config.Client(oauth2.NoContext, db.token)
}
// HTTPClient is the context key to use with golang.org/x/net/context's
// WithValue function to associate an *http.Client value with a context.
var HTTPClient ContextKey
So pass in a context with HTTPClient set...
*/
import (

View File

@@ -10,13 +10,24 @@ import (
"strings"
"sync"
"time"
"github.com/tsenart/tb"
)
// Globals
var (
Stats = NewStats()
Stats = NewStats()
tokenBucket *tb.Bucket
)
// Start the token bucket if necessary
func startTokenBucket() {
if bwLimit > 0 {
tokenBucket = tb.NewBucket(int64(bwLimit), 100*time.Millisecond)
Log(nil, "Starting bandwidth limiter at %vBytes/s", &bwLimit)
}
}
// Stringset holds some strings
type StringSet map[string]bool
@@ -178,6 +189,10 @@ func (file *Account) Read(p []byte) (n int, err error) {
if err == io.EOF {
// FIXME Do something?
}
// Limit the transfer speed if required
if tokenBucket != nil {
tokenBucket.Wait(int64(n))
}
return
}

View File

@@ -6,6 +6,8 @@ import (
"bufio"
"fmt"
"log"
"math"
"net/http"
"os"
"os/user"
"path"
@@ -15,6 +17,7 @@ import (
"time"
"github.com/Unknwon/goconfig"
"github.com/mreiferson/go-httpclient"
"github.com/ogier/pflag"
)
@@ -22,6 +25,8 @@ const (
configFileName = ".rclone.conf"
)
type SizeSuffix int64
// Global
var (
// Config file
@@ -33,23 +38,126 @@ var (
// Global config
Config = &ConfigInfo{}
// Flags
verbose = pflag.BoolP("verbose", "v", false, "Print lots more stuff")
quiet = pflag.BoolP("quiet", "q", false, "Print as little stuff as possible")
modifyWindow = pflag.DurationP("modify-window", "", time.Nanosecond, "Max time diff to be considered the same")
checkers = pflag.IntP("checkers", "", 8, "Number of checkers to run in parallel.")
transfers = pflag.IntP("transfers", "", 4, "Number of file transfers to run in parallel.")
configFile = pflag.StringP("config", "", ConfigPath, "Config file.")
dryRun = pflag.BoolP("dry-run", "n", false, "Do a trial run with no permanent changes")
verbose = pflag.BoolP("verbose", "v", false, "Print lots more stuff")
quiet = pflag.BoolP("quiet", "q", false, "Print as little stuff as possible")
modifyWindow = pflag.DurationP("modify-window", "", time.Nanosecond, "Max time diff to be considered the same")
checkers = pflag.IntP("checkers", "", 8, "Number of checkers to run in parallel.")
transfers = pflag.IntP("transfers", "", 4, "Number of file transfers to run in parallel.")
configFile = pflag.StringP("config", "", ConfigPath, "Config file.")
dryRun = pflag.BoolP("dry-run", "n", false, "Do a trial run with no permanent changes")
connectTimeout = pflag.DurationP("contimeout", "", 60*time.Second, "Connect timeout")
timeout = pflag.DurationP("timeout", "", 5*60*time.Second, "IO idle timeout")
bwLimit SizeSuffix
)
func init() {
pflag.VarP(&bwLimit, "bwlimit", "", "Bandwidth limit in kBytes/s, or use suffix k|M|G")
}
// Turn SizeSuffix into a string
func (x SizeSuffix) String() string {
scaled := float64(0)
suffix := ""
switch {
case x == 0:
return "0"
case x < 1024*1024:
scaled = float64(x) / 1024
suffix = "k"
case x < 1024*1024*1024:
scaled = float64(x) / 1024 / 1024
suffix = "M"
default:
scaled = float64(x) / 1024 / 1024 / 1024
suffix = "G"
}
if math.Floor(scaled) == scaled {
return fmt.Sprintf("%.0f%s", scaled, suffix)
}
return fmt.Sprintf("%.3f%s", scaled, suffix)
}
// Set a SizeSuffix
func (x *SizeSuffix) Set(s string) error {
if len(s) == 0 {
return fmt.Errorf("Empty string")
}
suffix := s[len(s)-1]
suffixLen := 1
var multiplier float64
switch suffix {
case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '.':
suffixLen = 0
multiplier = 1 << 10
case 'k', 'K':
multiplier = 1 << 10
case 'm', 'M':
multiplier = 1 << 20
case 'g', 'G':
multiplier = 1 << 30
default:
return fmt.Errorf("Bad suffix %q", suffix)
}
s = s[:len(s)-suffixLen]
value, err := strconv.ParseFloat(s, 64)
if err != nil {
return err
}
if value < 0 {
return fmt.Errorf("Size can't be negative %q", s)
}
value *= multiplier
*x = SizeSuffix(value)
return nil
}
// Check it satisfies the interface
var _ pflag.Value = (*SizeSuffix)(nil)
// Filesystem config options
type ConfigInfo struct {
Verbose bool
Quiet bool
DryRun bool
ModifyWindow time.Duration
Checkers int
Transfers int
Verbose bool
Quiet bool
DryRun bool
ModifyWindow time.Duration
Checkers int
Transfers int
ConnectTimeout time.Duration // Connect timeout
Timeout time.Duration // Data channel timeout
}
// Transport returns an http.RoundTripper with the correct timeouts
func (ci *ConfigInfo) Transport() http.RoundTripper {
return &httpclient.Transport{
Proxy: http.ProxyFromEnvironment,
MaxIdleConnsPerHost: ci.Checkers + ci.Transfers + 1,
// ConnectTimeout, if non-zero, is the maximum amount of time a dial will wait for
// a connect to complete.
ConnectTimeout: ci.ConnectTimeout,
// ResponseHeaderTimeout, if non-zero, specifies the amount of
// time to wait for a server's response headers after fully
// writing the request (including its body, if any). This
// time does not include the time to read the response body.
ResponseHeaderTimeout: ci.Timeout,
// RequestTimeout, if non-zero, specifies the amount of time for the entire
// request to complete (including all of the above timeouts + entire response body).
// This should never be less than the sum total of the above two timeouts.
//RequestTimeout: NOT SET,
// ReadWriteTimeout, if non-zero, will set a deadline for every Read and
// Write operation on the request connection.
ReadWriteTimeout: ci.Timeout,
}
}
// Transport returns an http.Client with the correct timeouts
func (ci *ConfigInfo) Client() *http.Client {
return &http.Client{
Transport: ci.Transport(),
}
}
// Find the config directory
@@ -84,6 +192,8 @@ func LoadConfig() {
Config.Checkers = *checkers
Config.Transfers = *transfers
Config.DryRun = *dryRun
Config.Timeout = *timeout
Config.ConnectTimeout = *connectTimeout
ConfigPath = *configFile
@@ -97,6 +207,9 @@ func LoadConfig() {
log.Fatalf("Failed to read null config file: %v", err)
}
}
// Start the token bucket limiter
startTokenBucket()
}
// Save configuration file.
@@ -330,6 +443,7 @@ func EditConfig() {
name := ChooseRemote()
EditRemote(name)
case 'n':
nameLoop:
for {
fmt.Printf("name> ")
name := ReadLine()
@@ -340,7 +454,7 @@ func EditConfig() {
fmt.Printf("Can't use %q as it can be confused a drive letter\n", name)
default:
NewRemote(name)
break
break nameLoop
}
}
case 'd':

57
fs/config_test.go Normal file
View File

@@ -0,0 +1,57 @@
package fs
import "testing"
func TestSizeSuffixString(t *testing.T) {
for _, test := range []struct {
in float64
want string
}{
{0, "0"},
{102, "0.100k"},
{1024, "1k"},
{1024 * 1024, "1M"},
{1024 * 1024 * 1024, "1G"},
{10 * 1024 * 1024 * 1024, "10G"},
{10.1 * 1024 * 1024 * 1024, "10.100G"},
} {
ss := SizeSuffix(test.in)
got := ss.String()
if test.want != got {
t.Errorf("Want %v got %v", test.want, got)
}
}
}
func TestSizeSuffixSet(t *testing.T) {
for i, test := range []struct {
in string
want int64
err bool
}{
{"0", 0, false},
{"0.1k", 102, false},
{"0.1", 102, false},
{"1K", 1024, false},
{"1", 1024, false},
{"2.5", 1024 * 2.5, false},
{"1M", 1024 * 1024, false},
{"1.g", 1024 * 1024 * 1024, false},
{"10G", 10 * 1024 * 1024 * 1024, false},
{"", 0, true},
{"1p", 0, true},
{"1.p", 0, true},
{"1p", 0, true},
{"-1K", 0, true},
} {
ss := SizeSuffix(0)
err := ss.Set(test.in)
if (err != nil) != test.err {
t.Errorf("%d: Expecting error %v but got error %v", i, test.err, err)
}
got := int64(ss)
if test.want != got {
t.Errorf("%d: Want %v got %v", i, test.want, got)
}
}
}

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"io"
"log"
"path/filepath"
"regexp"
"time"
)
@@ -235,14 +236,16 @@ func NewFs(path string) (Fs, error) {
if err != nil {
return nil, err
}
// change native directory separators to / if there are any
fsPath = filepath.ToSlash(fsPath)
return fs.NewFs(configName, fsPath)
}
// Outputs log for object
func OutputLog(o interface{}, text string, args ...interface{}) {
description := ""
if x, ok := o.(fmt.Stringer); ok {
description = x.String() + ": "
if o != nil {
description = fmt.Sprintf("%v: ", o)
}
out := fmt.Sprintf(text, args...)
log.Print(description + out)

View File

@@ -5,6 +5,8 @@ package fs
import (
"fmt"
"io"
"mime"
"path"
"sync"
)
@@ -97,15 +99,29 @@ func Equal(src, dst Object) bool {
return true
}
// Used to remove a failed copy
func removeFailedCopy(dst Object) {
if dst != nil {
Debug(dst, "Removing failed copy")
removeErr := dst.Remove()
if removeErr != nil {
Debug(dst, "Failed to remove failed copy: %s", removeErr)
}
// Returns a guess at the mime type from the extension
func MimeType(o Object) string {
mimeType := mime.TypeByExtension(path.Ext(o.Remote()))
if mimeType == "" {
mimeType = "application/octet-stream"
}
return mimeType
}
// Used to remove a failed copy
//
// Returns whether the file was succesfully removed or not
func removeFailedCopy(dst Object) bool {
if dst == nil {
return false
}
Debug(dst, "Removing failed copy")
removeErr := dst.Remove()
if removeErr != nil {
Debug(dst, "Failed to remove failed copy: %s", removeErr)
return false
}
return true
}
// Copy src object to dst or f if nil
@@ -139,7 +155,11 @@ tryAgain:
if r, ok := err.(Retry); ok && r.Retry() && tries < maxTries {
tries++
Log(src, "Received error: %v - retrying %d/%d", err, tries, maxTries)
removeFailedCopy(dst)
if removeFailedCopy(dst) {
// If we removed dst, then nil it out and note we are not updating
dst = nil
doUpdate = false
}
goto tryAgain
}
if err == nil {
@@ -259,6 +279,20 @@ func DeleteFiles(to_be_deleted ObjectsChan) {
wg.Wait()
}
// Read a map of Object.Remote to Object for the given Fs
func readFilesMap(fs Fs) map[string]Object {
files := make(map[string]Object)
for o := range fs.List() {
remote := o.Remote()
if _, ok := files[remote]; !ok {
files[remote] = o
} else {
Log(o, "Duplicate file detected")
}
}
return files
}
// Syncs fsrc into fdst
//
// If Delete is true then it deletes any files in fdst that aren't in fsrc
@@ -273,10 +307,7 @@ func Sync(fdst, fsrc Fs, Delete bool) error {
// Read the destination files first
// FIXME could do this in parallel and make it use less memory
delFiles := make(map[string]Object)
for dst := range fdst.List() {
delFiles[dst.Remote()] = dst
}
delFiles := readFilesMap(fdst)
// Read source files checking them off against dest files
to_be_checked := make(ObjectPairChan, Config.Transfers)
@@ -341,22 +372,20 @@ func Check(fdst, fsrc Fs) error {
// Read the destination files first
// FIXME could do this in parallel and make it use less memory
dstFiles := make(map[string]Object)
for dst := range fdst.List() {
dstFiles[dst.Remote()] = dst
}
dstFiles := readFilesMap(fdst)
// Read the source files checking them against dstFiles
// FIXME could do this in parallel and make it use less memory
srcFiles := make(map[string]Object)
srcFiles := readFilesMap(fsrc)
// Move all the common files into commonFiles and delete then
// from srcFiles and dstFiles
commonFiles := make(map[string][]Object)
for src := range fsrc.List() {
remote := src.Remote()
for remote, src := range srcFiles {
if dst, ok := dstFiles[remote]; ok {
commonFiles[remote] = []Object{dst, src}
delete(srcFiles, remote)
delete(dstFiles, remote)
} else {
srcFiles[remote] = src
}
}
@@ -436,6 +465,16 @@ func ListFn(f Fs, fn func(Object)) error {
return nil
}
// mutex for synchronized output
var outMutex sync.Mutex
// Synchronized fmt.Fprintf
func syncFprintf(w io.Writer, format string, a ...interface{}) (n int, err error) {
outMutex.Lock()
defer outMutex.Unlock()
return fmt.Fprintf(w, format, a...)
}
// List the Fs to stdout
//
// Shows size and path
@@ -443,7 +482,7 @@ func ListFn(f Fs, fn func(Object)) error {
// Lists in parallel which may get them out of order
func List(f Fs, w io.Writer) error {
return ListFn(f, func(o Object) {
fmt.Fprintf(w, "%9d %s\n", o.Size(), o.Remote())
syncFprintf(w, "%9d %s\n", o.Size(), o.Remote())
})
}
@@ -457,7 +496,7 @@ func ListLong(f Fs, w io.Writer) error {
Stats.Checking(o)
modTime := o.ModTime()
Stats.DoneChecking(o)
fmt.Fprintf(w, "%9d %s %s\n", o.Size(), modTime.Format("2006-01-02 15:04:05.000000000"), o.Remote())
syncFprintf(w, "%9d %s %s\n", o.Size(), modTime.Format("2006-01-02 15:04:05.000000000"), o.Remote())
})
}
@@ -475,14 +514,14 @@ func Md5sum(f Fs, w io.Writer) error {
Debug(o, "Failed to read MD5: %v", err)
md5sum = "UNKNOWN"
}
fmt.Fprintf(w, "%32s %s\n", md5sum, o.Remote())
syncFprintf(w, "%32s %s\n", md5sum, o.Remote())
})
}
// List the directories/buckets/containers in the Fs to stdout
func ListDir(f Fs, w io.Writer) error {
for dir := range f.ListDir() {
fmt.Fprintf(w, "%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)
syncFprintf(w, "%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)
}
return nil
}

View File

@@ -34,6 +34,7 @@ var (
flocal, fremote fs.Fs
RemoteName = flag.String("remote", "", "Remote to test with, defaults to local filesystem")
SubDir = flag.Bool("subdir", false, "Set to test with a sub directory")
Verbose = flag.Bool("verbose", false, "Set to enable logging")
finalise func()
)
@@ -62,8 +63,8 @@ var t3 = fstest.Time("2011-12-30T12:59:59.000000000Z")
func TestInit(t *testing.T) {
fs.LoadConfig()
fs.Config.Verbose = false
fs.Config.Quiet = true
fs.Config.Verbose = *Verbose
fs.Config.Quiet = !*Verbose
var err error
fremote, finalise, err = fstest.RandomRemote(*RemoteName, *SubDir)
if err != nil {

View File

@@ -1,39 +0,0 @@
package fs
import (
"io"
"os"
)
// SeekWrapper wraps an io.Reader with a basic Seek method which
// returns the Size attribute.
//
// This is used for google.golang.org/api/googleapi/googleapi.go
// to detect the length (see getReaderSize function)
//
// Without this the getReaderSize function reads the entire file into
// memory to find its length.
type SeekWrapper struct {
In io.Reader
Size int64
}
// Read bytes from the object - see io.Reader
func (file *SeekWrapper) Read(p []byte) (n int, err error) {
return file.In.Read(p)
}
// Seek - minimal implementation for Google API length detection
func (file *SeekWrapper) Seek(offset int64, whence int) (int64, error) {
switch whence {
case os.SEEK_CUR:
return 0, nil
case os.SEEK_END:
return file.Size, nil
}
return 0, nil
}
// Interfaces that SeekWrapper implements
var _ io.Reader = (*SeekWrapper)(nil)
var _ io.Seeker = (*SeekWrapper)(nil)

View File

@@ -1,3 +1,3 @@
package fs
const Version = "v1.09"
const Version = "v1.13"

View File

@@ -109,7 +109,7 @@ func TestFsListEmpty(t *testing.T) {
func TestFsListDirEmpty(t *testing.T) {
skipIfNotOk(t)
for obj := range remote.ListDir() {
t.Error("Found unexpected item %q", obj.Name)
t.Errorf("Found unexpected item %q", obj.Name)
}
}
@@ -174,7 +174,7 @@ func TestFsListDirRoot(t *testing.T) {
skipIfNotOk(t)
rootRemote, err := fs.NewFs(RemoteName)
if err != nil {
t.Fatal("Failed to make remote %q: %v", RemoteName, err)
t.Fatalf("Failed to make remote %q: %v", RemoteName, err)
}
found := false
for obj := range rootRemote.ListDir() {
@@ -191,7 +191,7 @@ func TestFsListRoot(t *testing.T) {
skipIfNotOk(t)
rootRemote, err := fs.NewFs(RemoteName)
if err != nil {
t.Fatal("Failed to make remote %q: %v", RemoteName, err)
t.Fatalf("Failed to make remote %q: %v", RemoteName, err)
}
// Should either find file1 and file2 or nothing
found1 := false
@@ -384,7 +384,7 @@ func TestLimitedFs(t *testing.T) {
file2Copy.Path = "z.txt"
fileRemote, err := fs.NewFs(remoteName)
if err != nil {
t.Fatal("Failed to make remote %q: %v", remoteName, err)
t.Fatalf("Failed to make remote %q: %v", remoteName, err)
}
fstest.CheckListing(t, fileRemote, []fstest.Item{file2Copy})
_, ok := fileRemote.(*fs.Limited)
@@ -398,7 +398,7 @@ func TestLimitedFsNotFound(t *testing.T) {
remoteName := subRemoteName + "/not found.txt"
fileRemote, err := fs.NewFs(remoteName)
if err != nil {
t.Fatal("Failed to make remote %q: %v", remoteName, err)
t.Fatalf("Failed to make remote %q: %v", remoteName, err)
}
fstest.CheckListing(t, fileRemote, []fstest.Item{})
_, ok := fileRemote.(*fs.Limited)

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"log"
"net/http"
"code.google.com/p/goauth2/oauth"
"github.com/ncw/rclone/fs"
@@ -82,7 +81,7 @@ func (auth *Auth) newTransport(name string) (*oauth.Transport, error) {
t := &oauth.Transport{
Config: config,
Transport: http.DefaultTransport,
Transport: fs.Config.Transport(),
}
return t, nil

View File

@@ -17,7 +17,6 @@ import (
"encoding/hex"
"fmt"
"io"
"mime"
"net/http"
"path"
"regexp"
@@ -360,7 +359,6 @@ func (f *FsStorage) ListDir() fs.DirChan {
func (f *FsStorage) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.Object, error) {
// Temporary FsObject under construction
o := &FsObjectStorage{storage: f, remote: remote}
in = &fs.SeekWrapper{In: in, Size: size}
return o, o.Update(in, modTime, size)
}
@@ -549,21 +547,14 @@ func (o *FsObjectStorage) Open() (in io.ReadCloser, err error) {
//
// The new object may have been created if an error is returned
func (o *FsObjectStorage) Update(in io.Reader, modTime time.Time, size int64) error {
// Guess the content type
contentType := mime.TypeByExtension(path.Ext(o.remote))
if contentType == "" {
contentType = "application/octet-stream"
}
object := storage.Object{
Bucket: o.storage.bucket,
Name: o.storage.root + o.remote,
ContentType: contentType,
ContentType: fs.MimeType(o),
Size: uint64(size),
Updated: modTime.Format(timeFormatOut), // Doesn't get set
Metadata: metadataFromModTime(modTime),
}
in = &fs.SeekWrapper{In: in, Size: size}
newObject, err := o.storage.svc.Objects.Insert(o.storage.bucket, &object).Media(in).Name(object.Name).PredefinedAcl(o.storage.objectAcl).Do()
if err != nil {
return err

View File

@@ -19,30 +19,23 @@ Todo
* if object.PseudoDirectory {
* fmt.Printf("%9s %19s %s\n", "Directory", "-", fs.Remote())
* Make Account wrapper
* limit bandwidth for a pool of all individual connectinos
* do timeouts by setting a limit, seeing whether io has happened
and resetting it if it has
* make Account do progress meter
* Make logging controllable with flags (mostly done)
* -timeout: Make all timeouts be settable with command line parameters
* Windows paths? Do we need to translate / and \?
* Make a fs.Errorf and count errors and log them at a different level
* Add max object size to fs metadata - 5GB for swift, infinite for local, ? for s3
* tie into -max-size flag
* FIXME Make NewFs to return err.IsAnObject so can put the LimitedFs
creation in common code? Or try for as much as possible?
* FIXME Account all the transactons (ls etc) using a different
Roundtripper wrapper which wraps the transactions?
* FIXME write tests for local file system
* FIXME implement tests for single file operations in rclonetest
* Need to make directory objects otherwise can't upload an empty directory
* Or could upload empty directories only?
More rsync features
* include
* exclude
* max size
* bandwidth limit
* -c, --checksum skip based on checksum, not mod-time & size
Ideas for flags
* --retries N flag which would make rclone retry a sync until successful or it tried N times.
Ideas
* could do encryption - put IV into metadata?
@@ -55,13 +48,6 @@ Ideas
* control times sync (which is slow with some remotes) with -a --archive flag?
* Copy a glob pattern - could do with LimitedFs
s3
* Can maybe set last modified?
* https://forums.aws.amazon.com/message.jspa?messageID=214062
* Otherwise can set metadata
* Returns etag and last modified in bucket list
Bugs
* Non verbose - not sure number transferred got counted up? CHECK
* When doing copy it recurses the whole of the destination FS which isn't necessary

View File

@@ -28,8 +28,9 @@ import (
var (
// Flags
cpuprofile = pflag.StringP("cpuprofile", "", "", "Write cpu profile to file")
statsInterval = pflag.DurationP("stats", "", time.Minute*1, "Interval to print stats")
statsInterval = pflag.DurationP("stats", "", time.Minute*1, "Interval to print stats (0 to disable)")
version = pflag.BoolP("version", "V", false, "Print the version number")
logFile = pflag.StringP("log-file", "", "", "Log everything to this file")
)
type Command struct {
@@ -58,10 +59,10 @@ func (cmd *Command) checkArgs(args []string) {
var Commands = []Command{
{
Name: "copy",
ArgsHelp: "source://path dest://path",
ArgsHelp: "source:path dest:path",
Help: `
Copy the source to the destination. Doesn't transfer
unchanged files, testing first by modification time then by
unchanged files, testing by size and modification time or
MD5SUM. Doesn't delete files from the destination.`,
Run: func(fdst, fsrc fs.Fs) {
err := fs.Sync(fdst, fsrc, false)
@@ -74,13 +75,13 @@ var Commands = []Command{
},
{
Name: "sync",
ArgsHelp: "source://path dest://path",
ArgsHelp: "source:path dest:path",
Help: `
Sync the source to the destination. Doesn't transfer
unchanged files, testing first by modification time then by
MD5SUM. Deletes any files that exist in source that don't
exist in destination. Since this can cause data loss, test
first with the --dry-run flag.`,
Sync the source to the destination, changing the destination
only. Doesn't transfer unchanged files, testing by size and
modification time or MD5SUM. Destination is updated to match
source, including deleting files if necessary. Since this can
cause data loss, test first with the --dry-run flag.`,
Run: func(fdst, fsrc fs.Fs) {
err := fs.Sync(fdst, fsrc, true)
if err != nil {
@@ -92,7 +93,7 @@ var Commands = []Command{
},
{
Name: "ls",
ArgsHelp: "[remote://path]",
ArgsHelp: "[remote:path]",
Help: `
List all the objects in the the path with size and path.`,
Run: func(fdst, fsrc fs.Fs) {
@@ -106,7 +107,7 @@ var Commands = []Command{
},
{
Name: "lsd",
ArgsHelp: "[remote://path]",
ArgsHelp: "[remote:path]",
Help: `
List all directories/containers/buckets in the the path.`,
Run: func(fdst, fsrc fs.Fs) {
@@ -120,9 +121,10 @@ var Commands = []Command{
},
{
Name: "lsl",
ArgsHelp: "[remote://path]",
ArgsHelp: "[remote:path]",
Help: `
List all the objects in the the path with modification time, size and path.`,
List all the objects in the the path with modification time,
size and path.`,
Run: func(fdst, fsrc fs.Fs) {
err := fs.ListLong(fdst, os.Stdout)
if err != nil {
@@ -134,9 +136,10 @@ var Commands = []Command{
},
{
Name: "md5sum",
ArgsHelp: "[remote://path]",
ArgsHelp: "[remote:path]",
Help: `
Produces an md5sum file for all the objects in the path.`,
Produces an md5sum file for all the objects in the path. This
is in the same format as the standard md5sum tool produces.`,
Run: func(fdst, fsrc fs.Fs) {
err := fs.Md5sum(fdst, os.Stdout)
if err != nil {
@@ -148,7 +151,7 @@ var Commands = []Command{
},
{
Name: "mkdir",
ArgsHelp: "remote://path",
ArgsHelp: "remote:path",
Help: `
Make the path if it doesn't already exist`,
Run: func(fdst, fsrc fs.Fs) {
@@ -162,7 +165,7 @@ var Commands = []Command{
},
{
Name: "rmdir",
ArgsHelp: "remote://path",
ArgsHelp: "remote:path",
Help: `
Remove the path. Note that you can't remove a path with
objects in it, use purge for that.`,
@@ -177,7 +180,7 @@ var Commands = []Command{
},
{
Name: "purge",
ArgsHelp: "remote://path",
ArgsHelp: "remote:path",
Help: `
Remove the path and all of its contents.`,
Run: func(fdst, fsrc fs.Fs) {
@@ -191,7 +194,7 @@ var Commands = []Command{
},
{
Name: "check",
ArgsHelp: "source://path dest://path",
ArgsHelp: "source:path dest:path",
Help: `
Checks the files in the source and destination match. It
compares sizes and MD5SUMs and prints a report of files which
@@ -240,7 +243,8 @@ Subcommands:
fmt.Fprintf(os.Stderr, "Options:\n")
pflag.PrintDefaults()
fmt.Fprintf(os.Stderr, `
It is only necessary to use a unique prefix of the subcommand, eg 'up' for 'upload'.
It is only necessary to use a unique prefix of the subcommand, eg 'up'
for 'upload'.
`)
}
@@ -323,6 +327,9 @@ func NewFs(remote string) fs.Fs {
// Print the stats every statsInterval
func StartStats() {
if *statsInterval <= 0 {
return
}
go func() {
ch := time.Tick(*statsInterval)
for {
@@ -340,6 +347,17 @@ func main() {
}
command, args := ParseCommand()
// Log file output
if *logFile != "" {
f, err := os.OpenFile(*logFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640)
if err != nil {
log.Fatalf("Failed to open log file: %v", err)
}
f.Seek(0, os.SEEK_END)
log.SetOutput(f)
redirectStderr(f)
}
// Make source and destination fs
var fdst, fsrc fs.Fs
if len(args) >= 1 {
@@ -360,10 +378,10 @@ func main() {
if command.Run != nil {
command.Run(fdst, fsrc)
if !command.NoStats {
fmt.Println(fs.Stats)
fmt.Fprintln(os.Stderr, fs.Stats)
}
if fs.Config.Verbose {
log.Printf("*** Go routines at exit %d\n", runtime.NumGoroutine())
fs.Debug(nil, "Go routines at exit %d\n", runtime.NumGoroutine())
}
if fs.Stats.Errored() {
os.Exit(1)

15
redirect_stderr.go Normal file
View File

@@ -0,0 +1,15 @@
// Log the panic to the log file - for oses which can't do this
//+build !windows,!unix
package main
import (
"log"
"os"
)
// redirectStderr to the file passed in
func redirectStderr(f *os.File) {
log.Printf("Can't redirect stderr to file")
}

19
redirect_stderr_unix.go Normal file
View File

@@ -0,0 +1,19 @@
// Log the panic under unix to the log file
//+build unix
package main
import (
"log"
"os"
"syscall"
)
// redirectStderr to the file passed in
func redirectStderr(f *os.File) {
err := syscall.Dup2(int(f.Fd()), int(os.Stderr.Fd()))
if err != nil {
log.Fatalf("Failed to redirect stderr to file: %v", err)
}
}

View File

@@ -0,0 +1,39 @@
// Log the panic under windows to the log file
//
// Code from minix, via
//
// http://play.golang.org/p/kLtct7lSUg
//+build windows
package main
import (
"log"
"os"
"syscall"
)
var (
kernel32 = syscall.MustLoadDLL("kernel32.dll")
procSetStdHandle = kernel32.MustFindProc("SetStdHandle")
)
func setStdHandle(stdhandle int32, handle syscall.Handle) error {
r0, _, e1 := syscall.Syscall(procSetStdHandle.Addr(), 2, uintptr(stdhandle), uintptr(handle), 0)
if r0 == 0 {
if e1 != 0 {
return error(e1)
}
return syscall.EINVAL
}
return nil
}
// redirectStderr to the file passed in
func redirectStderr(f *os.File) {
err := setStdHandle(syscall.STD_ERROR_HANDLE, syscall.Handle(f.Fd()))
if err != nil {
log.Fatalf("Failed to redirect stderr to file: %v", err)
}
}

View File

@@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"io"
"mime"
"net/http"
"path"
"regexp"
@@ -101,7 +100,8 @@ func init() {
// Constants
const (
metaMtime = "X-Amz-Meta-Mtime" // the meta key to store mtime in
metaMtime = "X-Amz-Meta-Mtime" // the meta key to store mtime in
listChunkSize = 1024 // number of items to read at once
)
// FsS3 represents a remote s3 server
@@ -184,6 +184,7 @@ func s3Connection(name string) (*s3.S3, error) {
}
c := s3.New(auth, region)
c.Client = fs.Config.Client()
return c, nil
}
@@ -267,36 +268,46 @@ func (f *FsS3) list(directories bool, fn func(string, *s3.Key)) {
if directories {
delimiter = "/"
}
// FIXME need to implement ALL loop
objects, err := f.b.List(f.root, delimiter, "", 10000)
if err != nil {
fs.Stats.Error()
fs.Log(f, "Couldn't read bucket %q: %s", f.bucket, err)
} else {
rootLength := len(f.root)
if directories {
for _, remote := range objects.CommonPrefixes {
if !strings.HasPrefix(remote, f.root) {
fs.Log(f, "Odd name received %q", remote)
continue
}
remote := remote[rootLength:]
if strings.HasSuffix(remote, "/") {
remote = remote[:len(remote)-1]
}
fn(remote, &s3.Key{Key: remote})
}
marker := ""
for {
objects, err := f.b.List(f.root, delimiter, marker, listChunkSize)
if err != nil {
fs.Stats.Error()
fs.Log(f, "Couldn't read bucket %q: %s", f.bucket, err)
} else {
for i := range objects.Contents {
object := &objects.Contents[i]
if !strings.HasPrefix(object.Key, f.root) {
fs.Log(f, "Odd name received %q", object.Key)
continue
rootLength := len(f.root)
if directories {
for _, remote := range objects.CommonPrefixes {
if !strings.HasPrefix(remote, f.root) {
fs.Log(f, "Odd name received %q", remote)
continue
}
remote := remote[rootLength:]
if strings.HasSuffix(remote, "/") {
remote = remote[:len(remote)-1]
}
fn(remote, &s3.Key{Key: remote})
}
} else {
for i := range objects.Contents {
object := &objects.Contents[i]
if !strings.HasPrefix(object.Key, f.root) {
fs.Log(f, "Odd name received %q", object.Key)
continue
}
remote := object.Key[rootLength:]
fn(remote, object)
}
remote := object.Key[rootLength:]
fn(remote, object)
}
}
if !objects.IsTruncated {
break
}
// Use NextMarker if set, otherwise use last Key
marker = objects.NextMarker
if marker == "" {
marker = objects.Contents[len(objects.Contents)-1].Key
}
}
}
@@ -409,9 +420,17 @@ func (o *FsObjectS3) Remote() string {
return o.remote
}
var matchMd5 = regexp.MustCompile(`^[0-9a-f]{32}$`)
// Md5sum returns the Md5sum of an object returning a lowercase hex string
func (o *FsObjectS3) Md5sum() (string, error) {
return strings.Trim(strings.ToLower(o.etag), `"`), nil
etag := strings.Trim(strings.ToLower(o.etag), `"`)
// Check the etag is a valid md5sum
if !matchMd5.MatchString(etag) {
fs.Debug(o, "Invalid md5sum (probably multipart uploaded) - ignoring: %q", etag)
return "", nil
}
return etag, nil
}
// Size returns the size of an object in bytes
@@ -525,13 +544,7 @@ func (o *FsObjectS3) Update(in io.Reader, modTime time.Time, size int64) error {
metaMtime: swift.TimeToFloatString(modTime),
}
// Guess the content type
contentType := mime.TypeByExtension(path.Ext(o.remote))
if contentType == "" {
contentType = "application/octet-stream"
}
_, err := o.s3.b.PutReaderHeaders(o.s3.root+o.remote, in, size, contentType, o.s3.perm, headers)
_, err := o.s3.b.PutReaderHeaders(o.s3.root+o.remote, in, size, fs.MimeType(o), o.s3.perm, headers)
if err != nil {
return err
}

View File

@@ -47,6 +47,9 @@ func init() {
}, {
Name: "tenant",
Help: "Tenant name - optional",
}, {
Name: "region",
Help: "Region name - optional",
},
// snet = flag.Bool("swift-snet", false, "Use internal service network") // FIXME not implemented
},
@@ -110,11 +113,15 @@ func swiftConnection(name string) (*swift.Connection, error) {
return nil, errors.New("auth not found")
}
c := &swift.Connection{
UserName: userName,
ApiKey: apiKey,
AuthUrl: authUrl,
UserAgent: fs.UserAgent,
Tenant: fs.ConfigFile.MustValue(name, "tenant"),
UserName: userName,
ApiKey: apiKey,
AuthUrl: authUrl,
UserAgent: fs.UserAgent,
Tenant: fs.ConfigFile.MustValue(name, "tenant"),
Region: fs.ConfigFile.MustValue(name, "region"),
ConnectTimeout: 10 * fs.Config.ConnectTimeout, // Use the timeouts in the transport
Timeout: 10 * fs.Config.Timeout, // Use the timeouts in the transport
Transport: fs.Config.Transport(),
}
err := c.Authenticate()
if err != nil {