From 94b089b59baff9831744e27e360d51faf1b68ae8 Mon Sep 17 00:00:00 2001 From: yuanqianhe Date: Thu, 9 Apr 2026 16:09:07 +0800 Subject: [PATCH] feat(s3fs): add disable_batch_delete option for OSS compatibility (#1330) --- openviking/agfs_manager.py | 1 + openviking_cli/utils/config/agfs_config.py | 7 ++++ .../agfs-server/pkg/plugins/s3fs/client.go | 33 ++++++++++++++----- .../agfs/agfs-server/pkg/plugins/s3fs/s3fs.go | 3 +- 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/openviking/agfs_manager.py b/openviking/agfs_manager.py index 1f10282d2..cbb59e1f7 100644 --- a/openviking/agfs_manager.py +++ b/openviking/agfs_manager.py @@ -176,6 +176,7 @@ def _generate_config(self) -> Path: "disable_ssl": not self.s3_config.use_ssl, "use_path_style": self.s3_config.use_path_style, "directory_marker_mode": self.s3_config.directory_marker_mode.value, + "disable_batch_delete": self.s3_config.disable_batch_delete, } config["plugins"]["s3fs"] = { diff --git a/openviking_cli/utils/config/agfs_config.py b/openviking_cli/utils/config/agfs_config.py index fb02331a1..1c694c590 100644 --- a/openviking_cli/utils/config/agfs_config.py +++ b/openviking_cli/utils/config/agfs_config.py @@ -60,6 +60,13 @@ class S3Config(BaseModel): description="How to persist S3 directory markers: 'none' skips marker creation, 'empty' writes a zero-byte marker, and 'nonempty' writes a non-empty marker payload. Defaults to 'empty'.", ) + disable_batch_delete: bool = Field( + default=False, + description="Disable batch delete (DeleteObjects) and use sequential single-object deletes instead. " + "Required for S3-compatible services like Alibaba Cloud OSS that require a Content-MD5 header " + "for DeleteObjects but AWS SDK v2 does not send it by default. Defaults to False.", + ) + model_config = {"extra": "forbid"} def validate_config(self): diff --git a/third_party/agfs/agfs-server/pkg/plugins/s3fs/client.go b/third_party/agfs/agfs-server/pkg/plugins/s3fs/client.go index 3579d9f4c..e47dc64e0 100644 --- a/third_party/agfs/agfs-server/pkg/plugins/s3fs/client.go +++ b/third_party/agfs/agfs-server/pkg/plugins/s3fs/client.go @@ -32,6 +32,7 @@ type S3Client struct { region string // AWS region prefix string // Optional prefix for all keys directoryMarkerMode DirectoryMarkerMode + disableBatchDelete bool // Disable batch delete for OSS compatibility } // S3Config holds S3 client configuration @@ -45,6 +46,7 @@ type S3Config struct { DisableSSL bool // For testing with local S3 UsePathStyle bool // Whether to use path-style addressing (true) or virtual-host-style (false) DirectoryMarkerMode DirectoryMarkerMode + DisableBatchDelete bool // Disable batch delete (DeleteObjects) for S3-compatible services } var nonEmptyDirectoryMarkerPayload = []byte{'\n'} @@ -134,6 +136,7 @@ func NewS3Client(cfg S3Config) (*S3Client, error) { region: cfg.Region, prefix: prefix, directoryMarkerMode: normalizeDirectoryMarkerMode(cfg.DirectoryMarkerMode), + disableBatchDelete: cfg.DisableBatchDelete, }, nil } @@ -412,14 +415,28 @@ func (c *S3Client) DeleteDirectory(ctx context.Context, path string) error { end = len(objectsToDelete) } - _, err := c.client.DeleteObjects(ctx, &s3.DeleteObjectsInput{ - Bucket: aws.String(c.bucket), - Delete: &types.Delete{ - Objects: objectsToDelete[i:end], - }, - }) - if err != nil { - return fmt.Errorf("failed to delete objects: %w", err) + if c.disableBatchDelete { + // Sequential single-object delete for S3-compatible services (e.g., Alibaba Cloud OSS) + // that require Content-MD5 for DeleteObjects but AWS SDK v2 does not send it by default. + for _, obj := range objectsToDelete[i:end] { + _, err := c.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(c.bucket), + Key: obj.Key, + }) + if err != nil { + return fmt.Errorf("failed to delete objects: %w", err) + } + } + } else { + _, err := c.client.DeleteObjects(ctx, &s3.DeleteObjectsInput{ + Bucket: aws.String(c.bucket), + Delete: &types.Delete{ + Objects: objectsToDelete[i:end], + }, + }) + if err != nil { + return fmt.Errorf("failed to delete objects: %w", err) + } } } diff --git a/third_party/agfs/agfs-server/pkg/plugins/s3fs/s3fs.go b/third_party/agfs/agfs-server/pkg/plugins/s3fs/s3fs.go index 16699168f..76258849b 100644 --- a/third_party/agfs/agfs-server/pkg/plugins/s3fs/s3fs.go +++ b/third_party/agfs/agfs-server/pkg/plugins/s3fs/s3fs.go @@ -589,7 +589,7 @@ func (p *S3FSPlugin) Validate(cfg map[string]interface{}) error { allowedKeys := []string{ "bucket", "region", "access_key_id", "secret_access_key", "endpoint", "prefix", "disable_ssl", "mount_path", "cache_enabled", "cache_ttl", "stat_cache_ttl", "cache_max_size", "use_path_style", - "directory_marker_mode", + "directory_marker_mode", "disable_batch_delete", } if err := config.ValidateOnlyKnownKeys(cfg, allowedKeys); err != nil { return err @@ -651,6 +651,7 @@ func (p *S3FSPlugin) Initialize(config map[string]interface{}) error { DisableSSL: getBoolConfig(config, "disable_ssl", false), UsePathStyle: getBoolConfig(config, "use_path_style", true), DirectoryMarkerMode: directoryMarkerMode, + DisableBatchDelete: getBoolConfig(config, "disable_batch_delete", false), } if cfg.Bucket == "" {