diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index babda4279bc5..03d8c83eab77 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -16,47 +16,40 @@ // under the License. use async_trait::async_trait; -use http::{header, Request, Response, StatusCode}; -use serde::Deserialize; -use std::fmt::Debug; +use http::StatusCode; + +use std::{fmt::Debug, sync::Arc}; use crate::{ ops::{OpDelete, OpRead, OpWrite}, raw::{ - build_rooted_abs_path, new_request_build_error, parse_into_metadata, Accessor, - AccessorInfo, AsyncBody, HttpClient, IncomingAsyncBody, RpDelete, RpRead, RpWrite, + parse_into_metadata, Accessor, AccessorInfo, HttpClient, IncomingAsyncBody, RpDelete, + RpRead, RpWrite, }, types::Result, Capability, Error, ErrorKind, }; -use super::{error::parse_error, writer::GdriveWriter}; +use super::{core::GdriveCore, error::parse_error, writer::GdriveWriter}; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct GdriveBackend { - root: String, - access_token: String, - client: HttpClient, + core: Arc, } impl GdriveBackend { pub(crate) fn new(root: String, access_token: String, http_client: HttpClient) -> Self { GdriveBackend { - root, - access_token, - client: http_client, + core: Arc::new(GdriveCore { + root, + access_token, + client: http_client, + path_cache: Arc::default(), + }), } } } -impl Debug for GdriveBackend { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut de = f.debug_struct("GoolgeDriveBackend"); - de.field("root", &self.root); - de.finish() - } -} - #[async_trait] impl Accessor for GdriveBackend { type Reader = IncomingAsyncBody; @@ -69,7 +62,7 @@ impl Accessor for GdriveBackend { fn info(&self) -> AccessorInfo { let mut ma = AccessorInfo::default(); ma.set_scheme(crate::Scheme::Gdrive) - .set_root(&self.root) + .set_root(&self.core.root) .set_capability(Capability { read: true, write: true, @@ -81,7 +74,7 @@ impl Accessor for GdriveBackend { } async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.gdrive_get(path).await?; + let resp = self.core.gdrive_get(path).await?; let status = resp.status(); @@ -104,12 +97,12 @@ impl Accessor for GdriveBackend { Ok(( RpWrite::default(), - GdriveWriter::new(self.clone(), args, String::from(path)), + GdriveWriter::new(self.core.clone(), args, String::from(path)), )) } async fn delete(&self, path: &str, _: OpDelete) -> Result { - let resp = self.gdrive_delete(path).await?; + let resp = self.core.gdrive_delete(path).await?; let status = resp.status(); @@ -119,134 +112,3 @@ impl Accessor for GdriveBackend { } } } - -impl GdriveBackend { - async fn get_abs_root_id(&self) -> String { - let mut req = Request::get("https://www.googleapis.com/drive/v3/files/root"); - let auth_header_content = format!("Bearer {}", self.access_token); - req = req.header(header::AUTHORIZATION, auth_header_content); - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error) - .unwrap(); - - let resp = self.client.send(req).await.unwrap(); - - let body_value: GdriveFile = - serde_json::from_slice(&resp.into_body().bytes().await.unwrap()).unwrap(); - let root_id = String::from(body_value.id.as_str()); - root_id - } - - async fn get_file_id_by_path(&self, file_path: &str) -> String { - let path = build_rooted_abs_path(&self.root, file_path); - let auth_header_content = format!("Bearer {}", self.access_token); - - let mut parent_id = self.get_abs_root_id().await; - let file_path_items: Vec<&str> = path.split('/').filter(|&x| !x.is_empty()).collect(); - - for (i, item) in file_path_items.iter().enumerate() { - let mut query = format!( - "name = '{}' and parents = '{}' and trashed = false", - item, parent_id - ); - if i != file_path_items.len() - 1 { - query += "and mimeType = 'application/vnd.google-apps.folder'"; - } - let query: String = query.chars().filter(|c| !c.is_whitespace()).collect(); - - let mut req = Request::get(format!( - "https://www.googleapis.com/drive/v3/files?q={}", - query - )); - req = req.header(header::AUTHORIZATION, &auth_header_content); - let req = req - .body(AsyncBody::default()) - .map_err(new_request_build_error) - .unwrap(); - - let resp = self.client.send(req).await.unwrap(); - - let body_value: GdriveFileList = - serde_json::from_slice(&resp.into_body().bytes().await.unwrap()).unwrap(); - parent_id = String::from(body_value.files[0].id.as_str()); - } - - parent_id - } - - async fn gdrive_get(&self, path: &str) -> Result> { - let url: String = format!( - "https://www.googleapis.com/drive/v3/files/{}?alt=media", - self.get_file_id_by_path(path).await - ); - - let auth_header_content = format!("Bearer {}", self.access_token); - let mut req = Request::get(&url); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.client.send(req).await - } - - pub async fn gdrive_update( - &self, - path: &str, - size: Option, - content_type: Option<&str>, - body: AsyncBody, - ) -> Result> { - let url = format!( - "https://www.googleapis.com/upload/drive/v3/files/{}", - self.get_file_id_by_path(path).await - ); - - let mut req = Request::patch(&url); - - let auth_header_content = format!("Bearer {}", self.access_token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - if let Some(size) = size { - req = req.header(header::CONTENT_LENGTH, size) - } - - if let Some(mime) = content_type { - req = req.header(header::CONTENT_TYPE, mime) - } - - let req = req.body(body).map_err(new_request_build_error)?; - - self.client.send(req).await - } - - async fn gdrive_delete(&self, path: &str) -> Result> { - let url = format!( - "https://www.googleapis.com/drive/v3/files/{}", - self.get_file_id_by_path(path).await - ); - - let mut req = Request::delete(&url); - - let auth_header_content = format!("Bearer {}", self.access_token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.client.send(req).await - } -} - -#[derive(Deserialize)] -struct GdriveFile { - id: String, -} - -#[derive(Deserialize)] -struct GdriveFileList { - files: Vec, -} diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs new file mode 100644 index 000000000000..6f66bf8bddec --- /dev/null +++ b/core/src/services/gdrive/core.rs @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use crate::raw::new_json_deserialize_error; +use crate::raw::percent_encode_path; +use crate::raw::HttpClient; +use crate::Error; +use crate::ErrorKind; + +use http::request::Builder; +use http::StatusCode; +use http::{header, Request, Response}; +use serde::Deserialize; +use tokio::sync::Mutex; + +use crate::{ + raw::{build_rooted_abs_path, new_request_build_error, AsyncBody, IncomingAsyncBody}, + types::Result, +}; + +use super::error::parse_error; + +pub struct GdriveCore { + pub root: String, + pub access_token: String, + pub client: HttpClient, + pub path_cache: Arc>>, +} + +impl Debug for GdriveCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut de = f.debug_struct("GdriveCore"); + de.field("root", &self.root); + de.finish() + } +} + +impl GdriveCore { + async fn get_abs_root_id(&self) -> Result { + let root = "root"; + + if let Some(root_id) = self.path_cache.lock().await.get(root) { + return Ok(root_id.to_string()); + } + + let req = self + .sign(Request::get( + "https://www.googleapis.com/drive/v3/files/root", + )) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.client.send(req).await?; + let status = resp.status(); + + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + + let gdrive_file: GdriveFile = + serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?; + + let root_id = gdrive_file.id; + + let mut cache_guard = self.path_cache.lock().await; + cache_guard.insert(root.to_owned(), root_id.clone()); + + Ok(root_id) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn get_file_id_by_path(&self, file_path: &str) -> Result { + let path = build_rooted_abs_path(&self.root, file_path); + + if let Some(file_id) = self.path_cache.lock().await.get(&path) { + return Ok(file_id.to_string()); + } + + let mut parent_id = self.get_abs_root_id().await?; + let file_path_items: Vec<&str> = path.split('/').filter(|&x| !x.is_empty()).collect(); + + for (i, item) in file_path_items.iter().enumerate() { + let mut query = format!( + "name = '{}' and parents = '{}' and trashed = false", + item, parent_id + ); + if i != file_path_items.len() - 1 { + query += "and mimeType = 'application/vnd.google-apps.folder'"; + } + + let req = self + .sign(Request::get(format!( + "https://www.googleapis.com/drive/v3/files?q={}", + percent_encode_path(&query) + ))) + .body(AsyncBody::default()) + .map_err(new_request_build_error)?; + + let resp = self.client.send(req).await?; + let status = resp.status(); + + if status == StatusCode::OK { + let resp_body = &resp.into_body().bytes().await?; + + let gdrive_file_list: GdriveFileList = + serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?; + + if gdrive_file_list.files.len() != 1 { + return Err(Error::new(ErrorKind::Unexpected, &format!("Please ensure that the file corresponding to the path exists and is unique. The response body is {}", String::from_utf8_lossy(resp_body)))); + } + + parent_id = gdrive_file_list.files[0].id.clone(); + } else { + return Err(parse_error(resp).await?); + } + } + + let mut cache_guard = self.path_cache.lock().await; + cache_guard.insert(path, parent_id.clone()); + + Ok(parent_id) + } + + pub async fn gdrive_get(&self, path: &str) -> Result> { + let url: String = format!( + "https://www.googleapis.com/drive/v3/files/{}?alt=media", + self.get_file_id_by_path(path).await? + ); + + let req = self + .sign(Request::get(&url)) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn gdrive_update( + &self, + path: &str, + size: Option, + content_type: Option<&str>, + body: AsyncBody, + ) -> Result> { + let url = format!( + "https://www.googleapis.com/upload/drive/v3/files/{}", + self.get_file_id_by_path(path).await? + ); + + let mut req = Request::patch(&url); + + if let Some(size) = size { + req = req.header(header::CONTENT_LENGTH, size) + } + + if let Some(mime) = content_type { + req = req.header(header::CONTENT_TYPE, mime) + } + + let req = self.sign(req).body(body).map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn gdrive_delete(&self, path: &str) -> Result> { + let url = format!( + "https://www.googleapis.com/drive/v3/files/{}", + self.get_file_id_by_path(path).await? + ); + + let req = self + .sign(Request::delete(&url)) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + fn sign(&self, mut req: Builder) -> Builder { + let auth_header_content = format!("Bearer {}", self.access_token); + req = req.header(header::AUTHORIZATION, auth_header_content); + req + } +} + +// refer to https://developers.google.com/drive/api/reference/rest/v3/files#File +#[derive(Deserialize)] +struct GdriveFile { + id: String, +} + +// refer to https://developers.google.com/drive/api/reference/rest/v3/files/list +#[derive(Deserialize)] +struct GdriveFileList { + files: Vec, +} diff --git a/core/src/services/gdrive/mod.rs b/core/src/services/gdrive/mod.rs index 7c88f4fe7458..5f77ecc50b21 100644 --- a/core/src/services/gdrive/mod.rs +++ b/core/src/services/gdrive/mod.rs @@ -17,6 +17,7 @@ mod backend; mod builder; +mod core; mod error; pub use builder::GdriveBuilder as Gdrive; diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index de9067aad0e2..dcff07d29b90 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -15,26 +15,27 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use async_trait::async_trait; use bytes::Bytes; use http::StatusCode; -use super::backend::GdriveBackend; +use super::core::GdriveCore; use super::error::parse_error; use crate::ops::OpWrite; use crate::raw::*; use crate::*; pub struct GdriveWriter { - backend: GdriveBackend, - + core: Arc, op: OpWrite, path: String, } impl GdriveWriter { - pub fn new(backend: GdriveBackend, op: OpWrite, path: String) -> Self { - GdriveWriter { backend, op, path } + pub fn new(core: Arc, op: OpWrite, path: String) -> Self { + GdriveWriter { core, op, path } } } @@ -42,7 +43,7 @@ impl GdriveWriter { impl oio::Write for GdriveWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { let resp = self - .backend + .core .gdrive_update( &self.path, Some(bs.len()),