Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions openviking/agfs_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def _generate_config(self) -> Path:
"disable_ssl": not self.s3_config.use_ssl,
"use_path_style": self.s3_config.use_path_style,
"directory_marker_mode": self.s3_config.directory_marker_mode.value,
"disable_batch_delete": self.s3_config.disable_batch_delete,
}

config["plugins"]["s3fs"] = {
Expand Down
7 changes: 7 additions & 0 deletions openviking_cli/utils/config/agfs_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ class S3Config(BaseModel):
description="How to persist S3 directory markers: 'none' skips marker creation, 'empty' writes a zero-byte marker, and 'nonempty' writes a non-empty marker payload. Defaults to 'empty'.",
)

disable_batch_delete: bool = Field(
default=False,
description="Disable batch delete (DeleteObjects) and use sequential single-object deletes instead. "
"Required for S3-compatible services like Alibaba Cloud OSS that require a Content-MD5 header "
"for DeleteObjects but AWS SDK v2 does not send it by default. Defaults to False.",
)

model_config = {"extra": "forbid"}

def validate_config(self):
Expand Down
33 changes: 25 additions & 8 deletions third_party/agfs/agfs-server/pkg/plugins/s3fs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type S3Client struct {
region string // AWS region
prefix string // Optional prefix for all keys
directoryMarkerMode DirectoryMarkerMode
disableBatchDelete bool // Disable batch delete for OSS compatibility
}

// S3Config holds S3 client configuration
Expand All @@ -45,6 +46,7 @@ type S3Config struct {
DisableSSL bool // For testing with local S3
UsePathStyle bool // Whether to use path-style addressing (true) or virtual-host-style (false)
DirectoryMarkerMode DirectoryMarkerMode
DisableBatchDelete bool // Disable batch delete (DeleteObjects) for S3-compatible services
}

var nonEmptyDirectoryMarkerPayload = []byte{'\n'}
Expand Down Expand Up @@ -134,6 +136,7 @@ func NewS3Client(cfg S3Config) (*S3Client, error) {
region: cfg.Region,
prefix: prefix,
directoryMarkerMode: normalizeDirectoryMarkerMode(cfg.DirectoryMarkerMode),
disableBatchDelete: cfg.DisableBatchDelete,
}, nil
}

Expand Down Expand Up @@ -412,14 +415,28 @@ func (c *S3Client) DeleteDirectory(ctx context.Context, path string) error {
end = len(objectsToDelete)
}

_, err := c.client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.bucket),
Delete: &types.Delete{
Objects: objectsToDelete[i:end],
},
})
if err != nil {
return fmt.Errorf("failed to delete objects: %w", err)
if c.disableBatchDelete {
// Sequential single-object delete for S3-compatible services (e.g., Alibaba Cloud OSS)
// that require Content-MD5 for DeleteObjects but AWS SDK v2 does not send it by default.
for _, obj := range objectsToDelete[i:end] {
_, err := c.client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(c.bucket),
Key: obj.Key,
})
if err != nil {
return fmt.Errorf("failed to delete objects: %w", err)
}
}
} else {
_, err := c.client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.bucket),
Delete: &types.Delete{
Objects: objectsToDelete[i:end],
},
})
if err != nil {
return fmt.Errorf("failed to delete objects: %w", err)
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion third_party/agfs/agfs-server/pkg/plugins/s3fs/s3fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ func (p *S3FSPlugin) Validate(cfg map[string]interface{}) error {
allowedKeys := []string{
"bucket", "region", "access_key_id", "secret_access_key", "endpoint", "prefix", "disable_ssl", "mount_path",
"cache_enabled", "cache_ttl", "stat_cache_ttl", "cache_max_size", "use_path_style",
"directory_marker_mode",
"directory_marker_mode", "disable_batch_delete",
}
if err := config.ValidateOnlyKnownKeys(cfg, allowedKeys); err != nil {
return err
Expand Down Expand Up @@ -651,6 +651,7 @@ func (p *S3FSPlugin) Initialize(config map[string]interface{}) error {
DisableSSL: getBoolConfig(config, "disable_ssl", false),
UsePathStyle: getBoolConfig(config, "use_path_style", true),
DirectoryMarkerMode: directoryMarkerMode,
DisableBatchDelete: getBoolConfig(config, "disable_batch_delete", false),
}

if cfg.Bucket == "" {
Expand Down