Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions cmd/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/spf13/cobra"

"github.com/apecloud/datasafed/pkg/storage"
"github.com/apecloud/datasafed/pkg/util"
)

var (
Expand Down Expand Up @@ -66,7 +67,7 @@ datasafed list --name "*.txt" /some/dir/
pflags.BoolVarP(&opts.filesOnly, "files-only", "f", false, "list files only")
pflags.BoolVarP(&opts.recursive, "recursive", "r", false, "list recursively")
pflags.IntVar(&opts.maxDepth, "max-depth", 0, "max depth when listing recursively")
pflags.StringVarP(&opts.sort, "sort", "s", "",
pflags.VarP(util.NewEnumVar(validSorts, &opts.sort), "sort", "s",
fmt.Sprintf("sort by which field, choices: %q, this option conflicts with --recursive", validSorts))
pflags.BoolVar(&opts.reverse, "reverse", false, "reverse order")
pflags.Int64Var(&opts.newer, "newer-than", 0,
Expand All @@ -75,7 +76,7 @@ datasafed list --name "*.txt" /some/dir/
"list only entries whose last modification time is older than the specified unix timestamp (exclusive)")
pflags.StringVar(&opts.namePattern, "name", "",
"list only entries whose name matches the specified pattern (https://pkg.go.dev/path/filepath#Match)")
pflags.StringVarP(&opts.format, "output-format", "o", "short",
pflags.VarP(util.NewEnumVar(validOutputFormats, &opts.format).Default("short"), "output-format", "o",
fmt.Sprintf("output format, choices: %q", validOutputFormats))

cmd.MarkFlagsMutuallyExclusive("dirs-only", "files-only")
Expand All @@ -85,12 +86,6 @@ datasafed list --name "*.txt" /some/dir/
}

func doList(opts *listOptions, cmd *cobra.Command, args []string) {
if opts.sort != "" && !slices.Contains(validSorts, opts.sort) {
exitIfError(fmt.Errorf("invalid sort: %q", opts.sort))
}
if !slices.Contains(validOutputFormats, opts.format) {
exitIfError(fmt.Errorf("invalid output format: %q", opts.format))
}
bufStdout := bufio.NewWriterSize(os.Stdout, 8*1024)
filter := getFilterFn(opts)
printer := getPrinter(opts, bufStdout)
Expand Down
47 changes: 44 additions & 3 deletions cmd/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@ import (
"path/filepath"
"strings"

"github.com/kopia/kopia/repo/compression"
"github.com/spf13/cobra"

"github.com/apecloud/datasafed/pkg/util"
)

type pullOptions struct {
decompression string
}

func init() {
opts := &pullOptions{}
cmd := &cobra.Command{
Use: "pull rpath lpath",
Short: "Pull remote file",
Expand All @@ -23,17 +31,24 @@ datasafed pull some/path/file.txt /tmp/file.txt
datasafed pull some/path/file.txt - | wc -l
`),
Args: cobra.ExactArgs(2),
Run: doPull,
Run: func(cmd *cobra.Command, args []string) {
doPull(opts, cmd, args)
},
}
pflags := cmd.PersistentFlags()
pflags.VarP(util.NewEnumVar(validCompressionAlgorithms, &opts.decompression), "decompress", "d",
fmt.Sprintf("decompress the pulled file using the specified algorithm, choices: %q", validCompressionAlgorithms))
rootCmd.AddCommand(cmd)
}

func doPull(cmd *cobra.Command, args []string) {
func doPull(opts *pullOptions, cmd *cobra.Command, args []string) {
rpath := args[0]
lpath := args[1]
var out io.Writer
var flush func() error
if lpath == "-" {
out = os.Stdout
flush = func() error { return nil }
} else {
if lpath == "" || strings.HasSuffix(lpath, "/") {
exitIfError(fmt.Errorf("invalid local path \"%s\"", lpath))
Expand All @@ -47,12 +62,38 @@ func doPull(cmd *cobra.Command, args []string) {
exitIfError(err)
f, err := os.OpenFile(lpath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
exitIfError(err)
defer f.Close()
out = f
flush = func() error { return f.Close() }
}
if opts.decompression != "" {
c, ok := compression.ByName[compression.Name(opts.decompression)]
if !ok {
exitIfError(fmt.Errorf("bug: compressor for %s is not found", opts.decompression))
}
pr, pw := io.Pipe()
ch := make(chan error, 1)
go func(out io.Writer) {
err := c.Decompress(out, pr, false)
pr.CloseWithError(err)
ch <- err
}(out)
out = pw
originalFlush := flush
flush = func() error {
pw.Close() // reach EOF
err := <-ch
if err != nil {
return err
}
return originalFlush()
}
}
err := globalStorage.Pull(appCtx, rpath, out)
if err != nil {
err = fmt.Errorf("pull %q: %w", rpath, err)
}
if ferr := flush(); err == nil {
err = ferr
}
exitIfError(err)
}
47 changes: 45 additions & 2 deletions cmd/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,28 @@ import (
"fmt"
"io"
"os"
"sort"
"strings"

"github.com/kopia/kopia/repo/compression"
"github.com/spf13/cobra"

"github.com/apecloud/datasafed/pkg/util"
)

// from https://github.com/kopia/kopia/blob/a934629c55f5a04a1496b58708bf44df1f7b6690/repo/compression/compressor.go#L13
const compressionHeaderSize = 4

var (
validCompressionAlgorithms = getValidCompressionAlgorithms()
)

type pushOptions struct {
compression string
}

func init() {
opts := &pushOptions{}
cmd := &cobra.Command{
Use: "push lpath rpath",
Short: "Push file to remote",
Expand All @@ -22,12 +38,26 @@ datasafed push local/path/a.txt remote/path/a.txt
datasafed push - remote/path/somefile.txt
`),
Args: cobra.ExactArgs(2),
Run: doPush,
Run: func(cmd *cobra.Command, args []string) {
doPush(opts, cmd, args)
},
}
pflags := cmd.PersistentFlags()
pflags.VarP(util.NewEnumVar(validCompressionAlgorithms, &opts.compression), "compress", "z",
fmt.Sprintf("compress the file using the specified algorithm before sending it to remote, choices: %q", validCompressionAlgorithms))
rootCmd.AddCommand(cmd)
}

func doPush(cmd *cobra.Command, args []string) {
func getValidCompressionAlgorithms() []string {
var names []string
for name := range compression.ByName {
names = append(names, string(name))
}
sort.Strings(names)
return names
}

func doPush(opts *pushOptions, cmd *cobra.Command, args []string) {
lpath := args[0]
rpath := args[1]
var in io.Reader
Expand All @@ -39,6 +69,19 @@ func doPush(cmd *cobra.Command, args []string) {
defer f.Close()
in = f
}
if opts.compression != "" {
c, ok := compression.ByName[compression.Name(opts.compression)]
if !ok {
exitIfError(fmt.Errorf("bug: compressor for %s is not found", opts.compression))
}
pr, pw := io.Pipe()
go func(r io.Reader) {
// discard compression header
err := c.Compress(util.DiscardN(pw, compressionHeaderSize), r)
pw.CloseWithError(err)
}(in)
in = pr
}
err := globalStorage.Push(appCtx, in, rpath)
if err != nil {
err = fmt.Errorf("push to %q: %w", rpath, err)
Expand Down
3 changes: 2 additions & 1 deletion docs/datasafed_pull.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ datasafed pull some/path/file.txt - | wc -l
### Options

```
-h, --help help for pull
-d, --decompress string decompress the pulled file using the specified algorithm, choices: ["deflate-best-compression" "deflate-best-speed" "deflate-default" "gzip" "gzip-best-compression" "gzip-best-speed" "lz4" "pgzip" "pgzip-best-compression" "pgzip-best-speed" "s2-better" "s2-default" "s2-parallel-4" "s2-parallel-8" "zstd" "zstd-best-compression" "zstd-better-compression" "zstd-fastest"]
-h, --help help for pull
```

### Options inherited from parent commands
Expand Down
3 changes: 2 additions & 1 deletion docs/datasafed_push.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ datasafed push - remote/path/somefile.txt
### Options

```
-h, --help help for push
-z, --compress string compress the file using the specified algorithm before sending it to remote, choices: ["deflate-best-compression" "deflate-best-speed" "deflate-default" "gzip" "gzip-best-compression" "gzip-best-speed" "lz4" "pgzip" "pgzip-best-compression" "pgzip-best-speed" "s2-better" "s2-default" "s2-parallel-4" "s2-parallel-8" "zstd" "zstd-best-compression" "zstd-better-compression" "zstd-fastest"]
-h, --help help for push
```

### Options inherited from parent commands
Expand Down
51 changes: 4 additions & 47 deletions pkg/logging/logfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/spf13/cobra"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/apecloud/datasafed/pkg/util"
)

const logsDirMode = 0o700
Expand Down Expand Up @@ -48,10 +50,10 @@ func (c *loggingFlags) setup(cmd *cobra.Command) {
f.DurationVar(&c.logDirMaxAge, "log-dir-max-age", 720*time.Hour, "Maximum age of log files to retain")
f.Float64Var(&c.logDirMaxTotalSizeMB, "log-dir-max-total-size-mb", 1000, "Maximum total size of log files to retain")
f.IntVar(&c.logFileMaxSegmentSize, "max-log-file-segment-size", 50000000, "Maximum size of a single log file segment")
f.Var(newEnum(logLevels, &c.logLevel).Default("info"), "log-level", "Console log level")
f.Var(util.NewEnumVar(logLevels, &c.logLevel).Default("info"), "log-level", "Console log level")
f.BoolVar(&c.jsonLogConsole, "json-log-console", false, "JSON log file")
f.BoolVar(&c.jsonLogFile, "json-log-file", false, "JSON log file")
f.Var(newEnum(logLevels, &c.fileLogLevel).Default("debug"), "file-log-level", "File log level")
f.Var(util.NewEnumVar(logLevels, &c.fileLogLevel).Default("debug"), "file-log-level", "File log level")
f.BoolVar(&c.fileLogLocalTimezone, "file-log-local-tz", false, "When logging to a file, use local timezone")
f.BoolVar(&c.forceColor, "force-color", false, "Force color output")
f.BoolVar(&c.disableColor, "disable-color", false, "Disable color output")
Expand Down Expand Up @@ -446,48 +448,3 @@ func (w *onDemandFile) Write(b []byte) (int, error) {
//nolint:wrapcheck
return n, err
}

type enum struct {
Allowed []string
variable *string
}

// newEnum give a list of allowed flag parameters
func newEnum(allowed []string, variable *string) *enum {
if variable == nil {
panic("variable should not be nil")
}
return &enum{
Allowed: allowed,
variable: variable,
}
}

func (a *enum) Default(d string) *enum {
*a.variable = d
return a
}

func (a enum) String() string {
return *a.variable
}

func (a *enum) Set(p string) error {
isIncluded := func(opts []string, val string) bool {
for _, opt := range opts {
if val == opt {
return true
}
}
return false
}
if !isIncluded(a.Allowed, p) {
return fmt.Errorf("%s is not included in %s", p, strings.Join(a.Allowed, ","))
}
*a.variable = p
return nil
}

func (a *enum) Type() string {
return "string"
}
37 changes: 37 additions & 0 deletions pkg/util/discard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package util

import (
"io"
"sync"
)

type discardN struct {
mu sync.Mutex
out io.Writer
n int
skipped int
}

func (d *discardN) Write(p []byte) (int, error) {
d.mu.Lock()
defer d.mu.Unlock()
rest := d.n - d.skipped
if rest > 0 {
if len(p) > rest {
d.skipped += rest
n, err := d.out.Write(p[rest:])
return n + rest, err
} else {
d.skipped += len(p)
return len(p), nil
}
}
return d.out.Write(p)
}

func DiscardN(out io.Writer, n int) io.Writer {
return &discardN{
out: out,
n: n,
}
}
51 changes: 51 additions & 0 deletions pkg/util/enumvar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package util

import (
"fmt"
"strings"
)

type EnumVar struct {
Allowed []string
variable *string
}

// NewEnumVar give a list of allowed flag parameters
func NewEnumVar(allowed []string, variable *string) *EnumVar {
if variable == nil {
panic("variable should not be nil")
}
return &EnumVar{
Allowed: allowed,
variable: variable,
}
}

func (a *EnumVar) Default(d string) *EnumVar {
*a.variable = d
return a
}

func (a EnumVar) String() string {
return *a.variable
}

func (a *EnumVar) Set(p string) error {
isIncluded := func(opts []string, val string) bool {
for _, opt := range opts {
if val == opt {
return true
}
}
return false
}
if !isIncluded(a.Allowed, p) {
return fmt.Errorf("%s is not included in %s", p, strings.Join(a.Allowed, ","))
}
*a.variable = p
return nil
}

func (a *EnumVar) Type() string {
return "string"
}