From 79b5e2d750a03a9b5893a08583f1debbe2b8862b Mon Sep 17 00:00:00 2001 From: "x.zhou" Date: Tue, 2 Jan 2024 15:13:53 +0800 Subject: [PATCH] feat: support compression --- cmd/list.go | 11 +++------ cmd/pull.go | 47 +++++++++++++++++++++++++++++++++++--- cmd/push.go | 47 ++++++++++++++++++++++++++++++++++++-- docs/datasafed_pull.md | 3 ++- docs/datasafed_push.md | 3 ++- pkg/logging/logfile.go | 51 ++++-------------------------------------- pkg/util/discard.go | 37 ++++++++++++++++++++++++++++++ pkg/util/enumvar.go | 51 ++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 188 insertions(+), 62 deletions(-) create mode 100644 pkg/util/discard.go create mode 100644 pkg/util/enumvar.go diff --git a/cmd/list.go b/cmd/list.go index c172b9c..2a95cc5 100644 --- a/cmd/list.go +++ b/cmd/list.go @@ -15,6 +15,7 @@ import ( "github.com/spf13/cobra" "github.com/apecloud/datasafed/pkg/storage" + "github.com/apecloud/datasafed/pkg/util" ) var ( @@ -66,7 +67,7 @@ datasafed list --name "*.txt" /some/dir/ pflags.BoolVarP(&opts.filesOnly, "files-only", "f", false, "list files only") pflags.BoolVarP(&opts.recursive, "recursive", "r", false, "list recursively") pflags.IntVar(&opts.maxDepth, "max-depth", 0, "max depth when listing recursively") - pflags.StringVarP(&opts.sort, "sort", "s", "", + pflags.VarP(util.NewEnumVar(validSorts, &opts.sort), "sort", "s", fmt.Sprintf("sort by which field, choices: %q, this option conflicts with --recursive", validSorts)) pflags.BoolVar(&opts.reverse, "reverse", false, "reverse order") pflags.Int64Var(&opts.newer, "newer-than", 0, @@ -75,7 +76,7 @@ datasafed list --name "*.txt" /some/dir/ "list only entries whose last modification time is older than the specified unix timestamp (exclusive)") pflags.StringVar(&opts.namePattern, "name", "", "list only entries whose name matches the specified pattern (https://pkg.go.dev/path/filepath#Match)") - pflags.StringVarP(&opts.format, "output-format", "o", "short", + pflags.VarP(util.NewEnumVar(validOutputFormats, &opts.format).Default("short"), "output-format", "o", fmt.Sprintf("output format, choices: %q", validOutputFormats)) cmd.MarkFlagsMutuallyExclusive("dirs-only", "files-only") @@ -85,12 +86,6 @@ datasafed list --name "*.txt" /some/dir/ } func doList(opts *listOptions, cmd *cobra.Command, args []string) { - if opts.sort != "" && !slices.Contains(validSorts, opts.sort) { - exitIfError(fmt.Errorf("invalid sort: %q", opts.sort)) - } - if !slices.Contains(validOutputFormats, opts.format) { - exitIfError(fmt.Errorf("invalid output format: %q", opts.format)) - } bufStdout := bufio.NewWriterSize(os.Stdout, 8*1024) filter := getFilterFn(opts) printer := getPrinter(opts, bufStdout) diff --git a/cmd/pull.go b/cmd/pull.go index 4c1e9e3..fc58fcd 100644 --- a/cmd/pull.go +++ b/cmd/pull.go @@ -7,10 +7,18 @@ import ( "path/filepath" "strings" + "github.com/kopia/kopia/repo/compression" "github.com/spf13/cobra" + + "github.com/apecloud/datasafed/pkg/util" ) +type pullOptions struct { + decompression string +} + func init() { + opts := &pullOptions{} cmd := &cobra.Command{ Use: "pull rpath lpath", Short: "Pull remote file", @@ -23,17 +31,24 @@ datasafed pull some/path/file.txt /tmp/file.txt datasafed pull some/path/file.txt - | wc -l `), Args: cobra.ExactArgs(2), - Run: doPull, + Run: func(cmd *cobra.Command, args []string) { + doPull(opts, cmd, args) + }, } + pflags := cmd.PersistentFlags() + pflags.VarP(util.NewEnumVar(validCompressionAlgorithms, &opts.decompression), "decompress", "d", + fmt.Sprintf("decompress the pulled file using the specified algorithm, choices: %q", validCompressionAlgorithms)) rootCmd.AddCommand(cmd) } -func doPull(cmd *cobra.Command, args []string) { +func doPull(opts *pullOptions, cmd *cobra.Command, args []string) { rpath := args[0] lpath := args[1] var out io.Writer + var flush func() error if lpath == "-" { out = os.Stdout + flush = func() error { return nil } } else { if lpath == "" || strings.HasSuffix(lpath, "/") { exitIfError(fmt.Errorf("invalid local path \"%s\"", lpath)) @@ -47,12 +62,38 @@ func doPull(cmd *cobra.Command, args []string) { exitIfError(err) f, err := os.OpenFile(lpath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) exitIfError(err) - defer f.Close() out = f + flush = func() error { return f.Close() } + } + if opts.decompression != "" { + c, ok := compression.ByName[compression.Name(opts.decompression)] + if !ok { + exitIfError(fmt.Errorf("bug: compressor for %s is not found", opts.decompression)) + } + pr, pw := io.Pipe() + ch := make(chan error, 1) + go func(out io.Writer) { + err := c.Decompress(out, pr, false) + pr.CloseWithError(err) + ch <- err + }(out) + out = pw + originalFlush := flush + flush = func() error { + pw.Close() // reach EOF + err := <-ch + if err != nil { + return err + } + return originalFlush() + } } err := globalStorage.Pull(appCtx, rpath, out) if err != nil { err = fmt.Errorf("pull %q: %w", rpath, err) } + if ferr := flush(); err == nil { + err = ferr + } exitIfError(err) } diff --git a/cmd/push.go b/cmd/push.go index 0c7c540..7b63bd2 100644 --- a/cmd/push.go +++ b/cmd/push.go @@ -4,12 +4,28 @@ import ( "fmt" "io" "os" + "sort" "strings" + "github.com/kopia/kopia/repo/compression" "github.com/spf13/cobra" + + "github.com/apecloud/datasafed/pkg/util" ) +// from https://github.com/kopia/kopia/blob/a934629c55f5a04a1496b58708bf44df1f7b6690/repo/compression/compressor.go#L13 +const compressionHeaderSize = 4 + +var ( + validCompressionAlgorithms = getValidCompressionAlgorithms() +) + +type pushOptions struct { + compression string +} + func init() { + opts := &pushOptions{} cmd := &cobra.Command{ Use: "push lpath rpath", Short: "Push file to remote", @@ -22,12 +38,26 @@ datasafed push local/path/a.txt remote/path/a.txt datasafed push - remote/path/somefile.txt `), Args: cobra.ExactArgs(2), - Run: doPush, + Run: func(cmd *cobra.Command, args []string) { + doPush(opts, cmd, args) + }, } + pflags := cmd.PersistentFlags() + pflags.VarP(util.NewEnumVar(validCompressionAlgorithms, &opts.compression), "compress", "z", + fmt.Sprintf("compress the file using the specified algorithm before sending it to remote, choices: %q", validCompressionAlgorithms)) rootCmd.AddCommand(cmd) } -func doPush(cmd *cobra.Command, args []string) { +func getValidCompressionAlgorithms() []string { + var names []string + for name := range compression.ByName { + names = append(names, string(name)) + } + sort.Strings(names) + return names +} + +func doPush(opts *pushOptions, cmd *cobra.Command, args []string) { lpath := args[0] rpath := args[1] var in io.Reader @@ -39,6 +69,19 @@ func doPush(cmd *cobra.Command, args []string) { defer f.Close() in = f } + if opts.compression != "" { + c, ok := compression.ByName[compression.Name(opts.compression)] + if !ok { + exitIfError(fmt.Errorf("bug: compressor for %s is not found", opts.compression)) + } + pr, pw := io.Pipe() + go func(r io.Reader) { + // discard compression header + err := c.Compress(util.DiscardN(pw, compressionHeaderSize), r) + pw.CloseWithError(err) + }(in) + in = pr + } err := globalStorage.Push(appCtx, in, rpath) if err != nil { err = fmt.Errorf("push to %q: %w", rpath, err) diff --git a/docs/datasafed_pull.md b/docs/datasafed_pull.md index 7d57ca2..29da41a 100644 --- a/docs/datasafed_pull.md +++ b/docs/datasafed_pull.md @@ -23,7 +23,8 @@ datasafed pull some/path/file.txt - | wc -l ### Options ``` - -h, --help help for pull + -d, --decompress string decompress the pulled file using the specified algorithm, choices: ["deflate-best-compression" "deflate-best-speed" "deflate-default" "gzip" "gzip-best-compression" "gzip-best-speed" "lz4" "pgzip" "pgzip-best-compression" "pgzip-best-speed" "s2-better" "s2-default" "s2-parallel-4" "s2-parallel-8" "zstd" "zstd-best-compression" "zstd-better-compression" "zstd-fastest"] + -h, --help help for pull ``` ### Options inherited from parent commands diff --git a/docs/datasafed_push.md b/docs/datasafed_push.md index 38f0fcf..0e07a9f 100644 --- a/docs/datasafed_push.md +++ b/docs/datasafed_push.md @@ -23,7 +23,8 @@ datasafed push - remote/path/somefile.txt ### Options ``` - -h, --help help for push + -z, --compress string compress the file using the specified algorithm before sending it to remote, choices: ["deflate-best-compression" "deflate-best-speed" "deflate-default" "gzip" "gzip-best-compression" "gzip-best-speed" "lz4" "pgzip" "pgzip-best-compression" "pgzip-best-speed" "s2-better" "s2-default" "s2-parallel-4" "s2-parallel-8" "zstd" "zstd-best-compression" "zstd-better-compression" "zstd-fastest"] + -h, --help help for push ``` ### Options inherited from parent commands diff --git a/pkg/logging/logfile.go b/pkg/logging/logfile.go index 20fbe9d..179bb7c 100644 --- a/pkg/logging/logfile.go +++ b/pkg/logging/logfile.go @@ -16,6 +16,8 @@ import ( "github.com/spf13/cobra" "go.uber.org/zap" "go.uber.org/zap/zapcore" + + "github.com/apecloud/datasafed/pkg/util" ) const logsDirMode = 0o700 @@ -48,10 +50,10 @@ func (c *loggingFlags) setup(cmd *cobra.Command) { f.DurationVar(&c.logDirMaxAge, "log-dir-max-age", 720*time.Hour, "Maximum age of log files to retain") f.Float64Var(&c.logDirMaxTotalSizeMB, "log-dir-max-total-size-mb", 1000, "Maximum total size of log files to retain") f.IntVar(&c.logFileMaxSegmentSize, "max-log-file-segment-size", 50000000, "Maximum size of a single log file segment") - f.Var(newEnum(logLevels, &c.logLevel).Default("info"), "log-level", "Console log level") + f.Var(util.NewEnumVar(logLevels, &c.logLevel).Default("info"), "log-level", "Console log level") f.BoolVar(&c.jsonLogConsole, "json-log-console", false, "JSON log file") f.BoolVar(&c.jsonLogFile, "json-log-file", false, "JSON log file") - f.Var(newEnum(logLevels, &c.fileLogLevel).Default("debug"), "file-log-level", "File log level") + f.Var(util.NewEnumVar(logLevels, &c.fileLogLevel).Default("debug"), "file-log-level", "File log level") f.BoolVar(&c.fileLogLocalTimezone, "file-log-local-tz", false, "When logging to a file, use local timezone") f.BoolVar(&c.forceColor, "force-color", false, "Force color output") f.BoolVar(&c.disableColor, "disable-color", false, "Disable color output") @@ -446,48 +448,3 @@ func (w *onDemandFile) Write(b []byte) (int, error) { //nolint:wrapcheck return n, err } - -type enum struct { - Allowed []string - variable *string -} - -// newEnum give a list of allowed flag parameters -func newEnum(allowed []string, variable *string) *enum { - if variable == nil { - panic("variable should not be nil") - } - return &enum{ - Allowed: allowed, - variable: variable, - } -} - -func (a *enum) Default(d string) *enum { - *a.variable = d - return a -} - -func (a enum) String() string { - return *a.variable -} - -func (a *enum) Set(p string) error { - isIncluded := func(opts []string, val string) bool { - for _, opt := range opts { - if val == opt { - return true - } - } - return false - } - if !isIncluded(a.Allowed, p) { - return fmt.Errorf("%s is not included in %s", p, strings.Join(a.Allowed, ",")) - } - *a.variable = p - return nil -} - -func (a *enum) Type() string { - return "string" -} diff --git a/pkg/util/discard.go b/pkg/util/discard.go new file mode 100644 index 0000000..afcb60b --- /dev/null +++ b/pkg/util/discard.go @@ -0,0 +1,37 @@ +package util + +import ( + "io" + "sync" +) + +type discardN struct { + mu sync.Mutex + out io.Writer + n int + skipped int +} + +func (d *discardN) Write(p []byte) (int, error) { + d.mu.Lock() + defer d.mu.Unlock() + rest := d.n - d.skipped + if rest > 0 { + if len(p) > rest { + d.skipped += rest + n, err := d.out.Write(p[rest:]) + return n + rest, err + } else { + d.skipped += len(p) + return len(p), nil + } + } + return d.out.Write(p) +} + +func DiscardN(out io.Writer, n int) io.Writer { + return &discardN{ + out: out, + n: n, + } +} diff --git a/pkg/util/enumvar.go b/pkg/util/enumvar.go new file mode 100644 index 0000000..2e8f185 --- /dev/null +++ b/pkg/util/enumvar.go @@ -0,0 +1,51 @@ +package util + +import ( + "fmt" + "strings" +) + +type EnumVar struct { + Allowed []string + variable *string +} + +// NewEnumVar give a list of allowed flag parameters +func NewEnumVar(allowed []string, variable *string) *EnumVar { + if variable == nil { + panic("variable should not be nil") + } + return &EnumVar{ + Allowed: allowed, + variable: variable, + } +} + +func (a *EnumVar) Default(d string) *EnumVar { + *a.variable = d + return a +} + +func (a EnumVar) String() string { + return *a.variable +} + +func (a *EnumVar) Set(p string) error { + isIncluded := func(opts []string, val string) bool { + for _, opt := range opts { + if val == opt { + return true + } + } + return false + } + if !isIncluded(a.Allowed, p) { + return fmt.Errorf("%s is not included in %s", p, strings.Join(a.Allowed, ",")) + } + *a.variable = p + return nil +} + +func (a *EnumVar) Type() string { + return "string" +}