From 454ffdacba34f8fb2b362ff7c1205dca6f89bff4 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 20 Apr 2023 12:48:33 +0800 Subject: [PATCH 1/3] feat(oay): Add list objects v2 support Signed-off-by: Xuanwo --- Cargo.lock | 129 +++++++++++++- bin/oay/.gitignore | 1 + bin/oay/Cargo.toml | 10 +- bin/oay/oay.toml.example | 9 + bin/oay/src/bin/oay.rs | 29 +++- bin/oay/src/config.rs | 42 +++++ bin/oay/{ => src}/lib.rs | 5 + bin/oay/src/services/mod.rs | 19 +++ bin/oay/src/services/s3/mod.rs | 19 +++ bin/oay/src/services/s3/service.rs | 263 +++++++++++++++++++++++++++++ core/src/types/list.rs | 30 ++++ 11 files changed, 551 insertions(+), 5 deletions(-) create mode 100644 bin/oay/.gitignore create mode 100644 bin/oay/oay.toml.example create mode 100644 bin/oay/src/config.rs rename bin/oay/{ => src}/lib.rs (94%) create mode 100644 bin/oay/src/services/mod.rs create mode 100644 bin/oay/src/services/s3/mod.rs create mode 100644 bin/oay/src/services/s3/service.rs diff --git a/Cargo.lock b/Cargo.lock index 691c4a05ff4e..3a54fff4ae0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -263,6 +263,55 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "113713495a32dd0ab52baf5c10044725aa3aec00b31beda84218e469029b72a3" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backon" version = "0.4.0" @@ -1446,6 +1495,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "http-types" version = "2.12.0" @@ -1972,6 +2027,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matchit" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" + [[package]] name = "md-5" version = "0.10.5" @@ -2296,16 +2357,22 @@ name = "oay" version = "0.32.0" dependencies = [ "anyhow", + "axum", + "chrono", "clap 4.1.11", "dirs", - "env_logger", "futures", - "log", "opendal", + "quick-xml 0.27.1", "serde", "tokio", "toml 0.7.3", + "tower", + "tower-http", + "tracing", + "tracing-subscriber", "url", + "uuid", ] [[package]] @@ -3717,6 +3784,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7f05c1d5476066defcdfacce1f52fc3cae3af1d3089727100c02ae92e5abbe0" +dependencies = [ + "serde", +] + [[package]] name = "serde_qs" version = "0.8.5" @@ -3987,6 +4063,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "tagptr" version = "0.2.0" @@ -4256,6 +4338,47 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d1d42a9b3f3ec46ba828e8d376aec14592ea199f70a06a548587ecd1c4ab658" +dependencies = [ + "bitflags 1.3.2", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -4269,6 +4392,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -4480,6 +4604,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1674845326ee10d37ca60470760d4288a6f80f304007d92e5c53bab78c9cfd79" dependencies = [ "getrandom 0.2.8", + "rand 0.8.5", "serde", ] diff --git a/bin/oay/.gitignore b/bin/oay/.gitignore new file mode 100644 index 000000000000..e649572204f7 --- /dev/null +++ b/bin/oay/.gitignore @@ -0,0 +1 @@ +oay.toml diff --git a/bin/oay/Cargo.toml b/bin/oay/Cargo.toml index 3bde725cae2d..4f745ec27bf8 100644 --- a/bin/oay/Cargo.toml +++ b/bin/oay/Cargo.toml @@ -31,12 +31,13 @@ version.workspace = true [dependencies] anyhow = "1" +axum = "0.6" +chrono = "0.4.24" clap = { version = "4", features = ["cargo", "string"] } dirs = "5.0.0" -env_logger = "0.10" futures = "0.3" -log = "0.4" opendal.workspace = true +quick-xml = { version = "0.27", features = ["serialize", "overlapped-lists"] } serde = { version = "1", features = ["derive"] } tokio = { version = "1.27", features = [ "fs", @@ -45,4 +46,9 @@ tokio = { version = "1.27", features = [ "io-std", ] } toml = "0.7.3" +tower = "0.4" +tower-http = { version = "0.4", features = ["trace"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = "2.3.1" +uuid = { version = "1", features = ["v4", "fast-rng"] } diff --git a/bin/oay/oay.toml.example b/bin/oay/oay.toml.example new file mode 100644 index 000000000000..ed478c769788 --- /dev/null +++ b/bin/oay/oay.toml.example @@ -0,0 +1,9 @@ +[backend] +type = "s3" + +access_key_id = "access_key_id" +secret_access_key = "secret_access_key" + +[frontends.s3] +enable = true +addr = "127.0.0.1:2000" diff --git a/bin/oay/src/bin/oay.rs b/bin/oay/src/bin/oay.rs index c9b3b63866ab..b5f4813f64c7 100644 --- a/bin/oay/src/bin/oay.rs +++ b/bin/oay/src/bin/oay.rs @@ -16,10 +16,37 @@ // under the License. use anyhow::Result; +use oay::services::S3Service; +use oay::Config; +use opendal::services::Memory; +use opendal::Operator; +use std::sync::Arc; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; #[tokio::main] async fn main() -> Result<()> { - println!("Hello, world!"); + tracing_subscriber::registry() + .with(fmt::layer().pretty()) + .with(EnvFilter::from_default_env()) + .init(); + + let cfg: Config = Config { + backend: oay::BackendConfig { + typ: "memory".to_string(), + }, + frontends: oay::FrontendsConfig { + s3: oay::S3Config { + enable: true, + addr: "127.0.0.1:3000".to_string(), + }, + }, + }; + + let op = Operator::new(Memory::default())?.finish(); + + let s3 = S3Service::new(Arc::new(cfg), op); + + s3.serve().await?; Ok(()) } diff --git a/bin/oay/src/config.rs b/bin/oay/src/config.rs new file mode 100644 index 000000000000..64ab51c36a0e --- /dev/null +++ b/bin/oay/src/config.rs @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::Deserialize; +use serde::Serialize; + +#[derive(Serialize, Deserialize)] +pub struct Config { + pub backend: BackendConfig, + pub frontends: FrontendsConfig, +} + +#[derive(Serialize, Deserialize)] +pub struct BackendConfig { + #[serde(rename = "type")] + pub typ: String, +} + +#[derive(Serialize, Deserialize)] +pub struct FrontendsConfig { + pub s3: S3Config, +} + +#[derive(Serialize, Deserialize)] +pub struct S3Config { + pub enable: bool, + pub addr: String, +} diff --git a/bin/oay/lib.rs b/bin/oay/src/lib.rs similarity index 94% rename from bin/oay/lib.rs rename to bin/oay/src/lib.rs index b248758bc120..ba5cf7a9dd36 100644 --- a/bin/oay/lib.rs +++ b/bin/oay/src/lib.rs @@ -14,3 +14,8 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + +pub mod services; + +mod config; +pub use config::*; diff --git a/bin/oay/src/services/mod.rs b/bin/oay/src/services/mod.rs new file mode 100644 index 000000000000..001951bef0b9 --- /dev/null +++ b/bin/oay/src/services/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod s3; +pub use s3::S3Service; diff --git a/bin/oay/src/services/s3/mod.rs b/bin/oay/src/services/s3/mod.rs new file mode 100644 index 000000000000..236d0d18e98f --- /dev/null +++ b/bin/oay/src/services/s3/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod service; +pub use service::*; diff --git a/bin/oay/src/services/s3/service.rs b/bin/oay/src/services/s3/service.rs new file mode 100644 index 000000000000..efb546d8d239 --- /dev/null +++ b/bin/oay/src/services/s3/service.rs @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::Mutex; + +use anyhow::anyhow; +use axum::extract::Query; +use axum::extract::State; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::response::Response; +use axum::routing::get; +use axum::Router; +use chrono::SecondsFormat; +use opendal::Lister; +use opendal::Metakey; +use opendal::Operator; +use serde::Deserialize; +use serde::Serialize; +use tower::ServiceBuilder; +use tower_http::trace::TraceLayer; +use tracing::debug; +use uuid::Uuid; + +use crate::Config; + +pub struct S3Service { + cfg: Arc, + op: Operator, +} + +impl S3Service { + pub fn new(cfg: Arc, op: Operator) -> Self { + Self { cfg, op } + } + + pub async fn serve(&self) -> anyhow::Result<()> { + let s3_cfg = &self.cfg.frontends.s3; + + let app = Router::new() + .route("/", get(handle_list_objects)) + .layer(ServiceBuilder::new().layer(TraceLayer::new_for_http())) + .with_state(S3State { + op: self.op.clone(), + list_objects: Arc::default(), + }); + + axum::Server::bind(&s3_cfg.addr.parse().unwrap()) + .serve(app.into_make_service()) + .await?; + + Ok(()) + } +} + +#[derive(Clone)] +pub struct S3State { + op: Operator, + /// TODO: remove this global lock by page checkpoint. + list_objects: Arc>>, +} + +/// # TODO +/// +/// we need to support following parameters: +/// +/// - max-keys +/// - start-after +#[derive(Deserialize, Default, Debug)] +#[serde(default)] +struct ListObjectsV2Params { + prefix: String, + delimiter: String, + continuation_token: String, +} + +async fn handle_list_objects( + state: State, + params: Query, +) -> Result { + debug!("got params: {:?}", params); + if params.delimiter != "/" && !params.delimiter.is_empty() { + return Err(anyhow!("delimiter is not supported").into()); + } + + let lister = state + .list_objects + .lock() + .unwrap() + .remove(format!("{}-{}", params.prefix, params.continuation_token).as_str()); + + let mut lister = match lister { + Some(lister) => lister, + None => { + if params.delimiter.is_empty() { + state.op.list(¶ms.prefix).await? + } else { + state.op.scan(¶ms.prefix).await? + } + } + }; + + let page = lister.next_page().await?.unwrap_or_default(); + + let is_truncated = lister.has_next().await?; + + let (mut common_prefixes, mut contents) = (vec![], vec![]); + for v in page { + let meta = state + .op + .metadata( + &v, + Metakey::Mode | Metakey::LastModified | Metakey::Etag | Metakey::ContentLength, + ) + .await?; + + if meta.is_dir() { + common_prefixes.push(CommonPrefix { + prefix: v.path().to_string(), + }); + } else { + contents.push(Object { + key: v.path().to_string(), + last_modified: meta + .last_modified() + .unwrap_or_default() + .to_rfc3339_opts(SecondsFormat::Millis, true), + etag: meta.etag().unwrap_or_default().to_string(), + size: meta.content_length(), + }); + } + } + + let next_continuation_token = if is_truncated { + let token = format!("{}-{}", params.prefix, Uuid::new_v4().as_u128()); + // Insert the lister into the state so that we can continue listing + state + .list_objects + .lock() + .unwrap() + .insert(token.clone(), lister); + token + } else { + String::new() + }; + + let resp = ListBucketResult { + is_truncated, + common_prefixes, + contents, + continuation_token: params.continuation_token.to_string(), + next_continuation_token, + }; + + Ok(OkResponse { + code: StatusCode::OK, + content: quick_xml::se::to_string(&resp).unwrap().into_bytes(), + }) +} + +#[derive(Serialize, Default)] +#[serde(default, rename_all = "PascalCase")] +struct ListBucketResult { + is_truncated: bool, + common_prefixes: Vec, + contents: Vec, + continuation_token: String, + next_continuation_token: String, +} + +#[derive(Serialize, Default)] +#[serde(default, rename_all = "PascalCase")] +struct CommonPrefix { + prefix: String, +} + +#[derive(Serialize, Default)] +#[serde(default, rename_all = "PascalCase")] +struct Object { + key: String, + last_modified: String, + etag: String, + size: u64, +} + +struct OkResponse { + code: StatusCode, + content: Vec, +} + +impl IntoResponse for OkResponse { + fn into_response(self) -> Response { + (self.code, self.content).into_response() + } +} + +struct ErrorResponse { + code: StatusCode, + err: Error, +} + +impl IntoResponse for ErrorResponse { + fn into_response(self) -> Response { + (self.code, quick_xml::se::to_string(&self.err).unwrap()).into_response() + } +} + +#[derive(Serialize)] +#[serde(default, rename_all = "PascalCase")] +struct Error { + code: String, + message: String, + resource: String, + request_id: String, +} + +impl From for ErrorResponse { + fn from(err: opendal::Error) -> Self { + let err = Error { + code: "InternalError".to_string(), + message: err.to_string(), + resource: "".to_string(), + request_id: "".to_string(), + }; + + ErrorResponse { + code: StatusCode::INTERNAL_SERVER_ERROR, + err, + } + } +} + +impl From for ErrorResponse { + fn from(err: anyhow::Error) -> Self { + let err = Error { + code: "InternalError".to_string(), + message: err.to_string(), + resource: "".to_string(), + request_id: "".to_string(), + }; + + ErrorResponse { + code: StatusCode::INTERNAL_SERVER_ERROR, + err, + } + } +} diff --git a/core/src/types/list.rs b/core/src/types/list.rs index bc27688e4bba..02395063ca19 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -57,6 +57,36 @@ impl Lister { } } + /// has_next can be used to check if there are more pages. + pub async fn has_next(&mut self) -> Result { + debug_assert!( + self.fut.is_none(), + "there are ongoing futures for next page" + ); + + if !self.buf.is_empty() { + return Ok(true); + } + + let entries = match self + .pager + .as_mut() + .expect("pager must be valid") + .next() + .await? + { + // Ideally, the convert from `Vec` to `VecDeque` will not do reallocation. + // + // However, this could be changed as described in [impl From> for VecDeque](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E) + Some(entries) => entries.into(), + None => return Ok(false), + }; + // Push fetched entries into buffer. + self.buf = entries; + + Ok(true) + } + /// next_page can be used to fetch a new page. /// /// # Notes From 95a1af8296b8e29fdd93fc3cb055ab1c07f8b4f4 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 6 May 2023 23:42:57 +0800 Subject: [PATCH 2/3] Update Signed-off-by: Xuanwo --- bin/oay/README.md | 4 ++ bin/oay/src/services/s3/service.rs | 59 ++++++------------------------ 2 files changed, 16 insertions(+), 47 deletions(-) diff --git a/bin/oay/README.md b/bin/oay/README.md index caa23e57e101..766fd958a23f 100644 --- a/bin/oay/README.md +++ b/bin/oay/README.md @@ -11,3 +11,7 @@ Allow users to access different storage backend through their preferred APIs. ## Status Our first milestone is to provide S3 APIs. + +### S3 API + +Only `list_object_v2` with `start_after` is supported. diff --git a/bin/oay/src/services/s3/service.rs b/bin/oay/src/services/s3/service.rs index efb546d8d239..e1c76259b4cc 100644 --- a/bin/oay/src/services/s3/service.rs +++ b/bin/oay/src/services/s3/service.rs @@ -15,11 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::sync::Arc; -use std::sync::Mutex; -use anyhow::anyhow; use axum::extract::Query; use axum::extract::State; use axum::http::StatusCode; @@ -28,7 +25,7 @@ use axum::response::Response; use axum::routing::get; use axum::Router; use chrono::SecondsFormat; -use opendal::Lister; +use opendal::ops::OpList; use opendal::Metakey; use opendal::Operator; use serde::Deserialize; @@ -36,7 +33,6 @@ use serde::Serialize; use tower::ServiceBuilder; use tower_http::trace::TraceLayer; use tracing::debug; -use uuid::Uuid; use crate::Config; @@ -58,7 +54,6 @@ impl S3Service { .layer(ServiceBuilder::new().layer(TraceLayer::new_for_http())) .with_state(S3State { op: self.op.clone(), - list_objects: Arc::default(), }); axum::Server::bind(&s3_cfg.addr.parse().unwrap()) @@ -72,8 +67,6 @@ impl S3Service { #[derive(Clone)] pub struct S3State { op: Operator, - /// TODO: remove this global lock by page checkpoint. - list_objects: Arc>>, } /// # TODO @@ -81,13 +74,12 @@ pub struct S3State { /// we need to support following parameters: /// /// - max-keys -/// - start-after +/// - continuation_token #[derive(Deserialize, Default, Debug)] #[serde(default)] struct ListObjectsV2Params { prefix: String, - delimiter: String, - continuation_token: String, + start_after: String, } async fn handle_list_objects( @@ -95,26 +87,14 @@ async fn handle_list_objects( params: Query, ) -> Result { debug!("got params: {:?}", params); - if params.delimiter != "/" && !params.delimiter.is_empty() { - return Err(anyhow!("delimiter is not supported").into()); - } - - let lister = state - .list_objects - .lock() - .unwrap() - .remove(format!("{}-{}", params.prefix, params.continuation_token).as_str()); - let mut lister = match lister { - Some(lister) => lister, - None => { - if params.delimiter.is_empty() { - state.op.list(¶ms.prefix).await? - } else { - state.op.scan(¶ms.prefix).await? - } - } - }; + let mut lister = state + .op + .list_with( + ¶ms.prefix, + OpList::new().with_start_after(¶ms.start_after), + ) + .await?; let page = lister.next_page().await?.unwrap_or_default(); @@ -147,25 +127,11 @@ async fn handle_list_objects( } } - let next_continuation_token = if is_truncated { - let token = format!("{}-{}", params.prefix, Uuid::new_v4().as_u128()); - // Insert the lister into the state so that we can continue listing - state - .list_objects - .lock() - .unwrap() - .insert(token.clone(), lister); - token - } else { - String::new() - }; - let resp = ListBucketResult { is_truncated, common_prefixes, contents, - continuation_token: params.continuation_token.to_string(), - next_continuation_token, + start_after: Some(params.start_after.clone()), }; Ok(OkResponse { @@ -180,8 +146,7 @@ struct ListBucketResult { is_truncated: bool, common_prefixes: Vec, contents: Vec, - continuation_token: String, - next_continuation_token: String, + start_after: Option, } #[derive(Serialize, Default)] From 9072c051de982041bd7002f04d4fe054a356e649 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 6 May 2023 23:45:28 +0800 Subject: [PATCH 3/3] Return error if list with start_after is not supported Signed-off-by: Xuanwo --- bin/oay/src/services/s3/service.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/bin/oay/src/services/s3/service.rs b/bin/oay/src/services/s3/service.rs index e1c76259b4cc..84b9c7774fac 100644 --- a/bin/oay/src/services/s3/service.rs +++ b/bin/oay/src/services/s3/service.rs @@ -88,6 +88,18 @@ async fn handle_list_objects( ) -> Result { debug!("got params: {:?}", params); + if !state.op.info().capability().list_with_start_after { + return Err(ErrorResponse { + code: StatusCode::NOT_IMPLEMENTED, + err: Error { + code: "NotImplemented".to_string(), + message: "list with start after is not supported".to_string(), + resource: "".to_string(), + request_id: "".to_string(), + }, + }); + } + let mut lister = state .op .list_with(