Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl<S: Adapter> Accessor for Backend<S> {
if cap.read {
cap.read_can_seek = true;
cap.read_can_next = true;
cap.read_with_range = true;
cap.stat = true;
}

Expand Down
1 change: 1 addition & 0 deletions core/src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ impl Accessor for AzblobBackend {

read: true,
read_can_next: true,
read_with_range: true,
read_with_if_match: true,
read_with_if_none_match: true,
read_with_override_content_disposition: true,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/azdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ impl Accessor for AzdfsBackend {
.set_capability(Capability {
read: true,
read_can_next: true,
read_with_range: true,
write: true,
rename: true,
list: true,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ impl Accessor for FsBackend {
.set_capability(Capability {
read: true,
read_can_seek: true,
read_with_range: true,
write: true,
write_without_content_length: true,
create_dir: true,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/ftp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl Accessor for FtpBackend {
.set_root(&self.root)
.set_capability(Capability {
read: true,
read_with_range: true,
write: true,
list: true,
..Default::default()
Expand Down
1 change: 1 addition & 0 deletions core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ impl Accessor for GcsBackend {

read: true,
read_can_next: true,
read_with_range: true,
read_with_if_match: true,
read_with_if_none_match: true,

Expand Down
2 changes: 2 additions & 0 deletions core/src/services/ghac/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ impl Accessor for GhacBackend {
.set_capability(Capability {
read: true,
read_can_next: true,
read_with_range: true,

write: true,

..Default::default()
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/hdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ impl Accessor for HdfsBackend {
.set_capability(Capability {
read: true,
read_can_seek: true,
read_with_range: true,

write: true,
list: true,
blocking: true,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/http/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ impl Accessor for HttpBackend {

read: true,
read_can_next: true,
read_with_range: true,
read_with_if_match: true,
read_with_if_none_match: true,

Expand Down
1 change: 1 addition & 0 deletions core/src/services/ipfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ impl Accessor for IpfsBackend {
.set_capability(Capability {
read: true,
read_can_next: true,
read_with_range: true,
list: true,

..Default::default()
Expand Down
1 change: 1 addition & 0 deletions core/src/services/ipmfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl Accessor for IpmfsBackend {
.set_capability(Capability {
read: true,
read_can_next: true,
read_with_range: true,
write: true,

..Default::default()
Expand Down
1 change: 1 addition & 0 deletions core/src/services/obs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ impl Accessor for ObsBackend {

read: true,
read_can_next: true,
read_with_range: true,
read_with_if_match: true,
read_with_if_none_match: true,

Expand Down
1 change: 1 addition & 0 deletions core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ impl Accessor for OssBackend {

read: true,
read_can_next: true,
read_with_range: true,
read_with_if_match: true,
read_with_if_none_match: true,

Expand Down
1 change: 1 addition & 0 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,7 @@ impl Accessor for S3Backend {

read: true,
read_can_next: true,
read_with_range: true,
read_with_if_match: true,
read_with_if_none_match: true,
read_with_override_cache_control: true,
Expand Down
67 changes: 58 additions & 9 deletions core/src/services/supabase/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,42 @@ impl Accessor for SupabaseBackend {
stat: true,
read: true,
write: true,
create_dir: true,
delete: true,

..Default::default()
});

am
}

async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> {
let resp = self.core.supabase_get_object(path).await?;
async fn create_dir(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
let mut req =
self.core
.supabase_upload_object_request(path, Some(0), None, AsyncBody::Empty)?;

self.core.sign(&mut req)?;

let resp = self.core.send(req).await?;

let status = resp.status();

if status.is_success() {
resp.into_body().consume().await?;
Ok(RpCreate::default())
} else {
// create duplicate dir is ok
let e = parse_error(resp).await?;
if e.kind() == ErrorKind::AlreadyExists {
Ok(RpCreate::default())
} else {
Err(e)
}
}
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let resp = self.core.supabase_get_object(path, args.range()).await?;

let status = resp.status();

Expand Down Expand Up @@ -259,16 +286,38 @@ impl Accessor for SupabaseBackend {
return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
}

let resp = self.core.supabase_get_object_info(path).await?;
// The get_object_info does not contain the file size. Therefore
Comment thread
Xuanwo marked this conversation as resolved.
// we first try the get the metadata through head, if we fail,
// we then use get_object_info to get the actual error info
let mut resp = self.core.supabase_head_object(path).await?;

let status = resp.status();

match status {
match resp.status() {
StatusCode::OK => parse_into_metadata(path, resp.headers()).map(RpStat::new),
StatusCode::NOT_FOUND if path.ends_with('/') => {
Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
_ => {
resp = self.core.supabase_get_object_info(path).await?;
Comment thread
xyjixyjixyji marked this conversation as resolved.
match resp.status() {
StatusCode::NOT_FOUND if path.ends_with('/') => {
Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
}
_ => Err(parse_error(resp).await?),
}
}
}
}

async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
let resp = self.core.supabase_delete_object(path).await?;

if resp.status().is_success() {
Ok(RpDelete::default())
} else {
// deleting not existing objects is ok
let e = parse_error(resp).await?;
if e.kind() == ErrorKind::NotFound {
Ok(RpDelete::default())
} else {
Err(e)
}
_ => Err(parse_error(resp).await?),
}
}
}
84 changes: 77 additions & 7 deletions core/src/services/supabase/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,25 @@ impl SupabaseCore {
Ok(req)
}

pub fn supabase_get_object_public_request(&self, path: &str) -> Result<Request<AsyncBody>> {
pub fn supabase_delete_object_request(&self, path: &str) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/storage/v1/object/{}/{}",
self.endpoint,
self.bucket,
percent_encode_path(&p)
);

Request::delete(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)
}

pub fn supabase_get_object_public_request(
&self,
path: &str,
_: BytesRange,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/storage/v1/object/public/{}/{}",
Expand All @@ -118,12 +136,44 @@ impl SupabaseCore {
percent_encode_path(&p)
);

Request::get(&url)
let req = Request::get(&url);

req.body(AsyncBody::Empty).map_err(new_request_build_error)
}

pub fn supabase_get_object_auth_request(
&self,
path: &str,
_: BytesRange,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/storage/v1/object/authenticated/{}/{}",
self.endpoint,
self.bucket,
percent_encode_path(&p)
);

let req = Request::get(&url);

req.body(AsyncBody::Empty).map_err(new_request_build_error)
}

pub fn supabase_head_object_public_request(&self, path: &str) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/storage/v1/object/public/{}/{}",
self.endpoint,
self.bucket,
percent_encode_path(&p)
);

Request::head(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)
}

pub fn supabase_get_object_auth_request(&self, path: &str) -> Result<Request<AsyncBody>> {
pub fn supabase_head_object_auth_request(&self, path: &str) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/storage/v1/object/authenticated/{}/{}",
Expand All @@ -132,7 +182,7 @@ impl SupabaseCore {
percent_encode_path(&p)
);

Request::get(&url)
Request::head(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)
}
Expand Down Expand Up @@ -175,11 +225,25 @@ impl SupabaseCore {
self.http_client.send(req).await
}

pub async fn supabase_get_object(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
pub async fn supabase_get_object(
&self,
path: &str,
range: BytesRange,
) -> Result<Response<IncomingAsyncBody>> {
let mut req = if self.key.is_some() {
self.supabase_get_object_auth_request(path, range)?
} else {
self.supabase_get_object_public_request(path, range)?
};
self.sign(&mut req)?;
self.send(req).await
}

pub async fn supabase_head_object(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
let mut req = if self.key.is_some() {
self.supabase_get_object_auth_request(path)?
self.supabase_head_object_auth_request(path)?
} else {
self.supabase_get_object_public_request(path)?
self.supabase_head_object_public_request(path)?
};
self.sign(&mut req)?;
self.send(req).await
Expand All @@ -197,4 +261,10 @@ impl SupabaseCore {
self.sign(&mut req)?;
self.send(req).await
}

pub async fn supabase_delete_object(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
let mut req = self.supabase_delete_object_request(path)?;
self.sign(&mut req)?;
self.send(req).await
}
}
36 changes: 21 additions & 15 deletions core/src/services/supabase/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,12 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
let (parts, body) = resp.into_parts();
let bs = body.bytes().await?;

// todo: the supabase error has status code 4XX, handle all that
let (kind, retryable) = match parts.status {
StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
(ErrorKind::ConditionNotMatch, false)
}
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
_ => (ErrorKind::Unexpected, false),
};

let (mut kind, mut retryable) = (ErrorKind::Unexpected, false);
let (message, _) = from_slice::<SupabaseError>(&bs)
.map(|sb_err| (format!("{sb_err:?}"), Some(sb_err)))
.map(|sb_err| {
(kind, retryable) = parse_supabase_error(&sb_err);
(format!("{sb_err:?}"), Some(sb_err))
})
.unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));

let mut err = Error::new(kind, &message).with_context("response", format!("{parts:?}"));
Expand All @@ -65,3 +55,19 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {

Ok(err)
}

// Return the error kind and whether it is retryable
fn parse_supabase_error(err: &SupabaseError) -> (ErrorKind, bool) {
let code = err.status_code.parse::<u16>().unwrap();
if code == StatusCode::CONFLICT.as_u16() && err.error == "Duplicate" {
(ErrorKind::AlreadyExists, false)
} else if code == StatusCode::NOT_FOUND.as_u16() {
(ErrorKind::NotFound, false)
} else if code == StatusCode::FORBIDDEN.as_u16() {
(ErrorKind::PermissionDenied, false)
} else if code == StatusCode::PRECONDITION_FAILED.as_u16() {
(ErrorKind::ConditionNotMatch, false)
} else {
(ErrorKind::Unexpected, false)
}
}
1 change: 1 addition & 0 deletions core/src/services/wasabi/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,7 @@ impl Accessor for WasabiBackend {

read: true,
read_can_next: true,
read_with_range: true,

write: true,
list: true,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/webdav/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ impl Accessor for WebdavBackend {
.set_capability(Capability {
read: true,
read_can_next: true,
read_with_range: true,
write: true,
list: true,
copy: true,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/webhdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ impl Accessor for WebhdfsBackend {
.set_capability(Capability {
read: true,
read_can_next: true,
read_with_range: true,
write: true,
list: true,
..Default::default()
Expand Down
Loading