diff --git a/.github/workflows/service_test_webdav.yml b/.github/workflows/service_test_webdav.yml index a67d2c7d161f..384c086c83aa 100644 --- a/.github/workflows/service_test_webdav.yml +++ b/.github/workflows/service_test_webdav.yml @@ -112,3 +112,40 @@ jobs: OPENDAL_WEBDAV_ENDPOINT: http://127.0.0.1:8080 OPENDAL_WEBDAV_USERNAME: bar OPENDAL_WEBDAV_PASSWORD: bar + + nginx_with_redirect: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: + - ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Setup Rust toolchain + uses: ./.github/actions/setup + + - name: Install nginx full for dav_ext modules + run: sudo apt install nginx-full + + - name: Start nginx + shell: bash + working-directory: core + run: | + mkdir -p /tmp/static + mkdir -p /var/lib/nginx + # make nginx worker has permission to operate it + chmod a+rw /tmp/static/ + # make nginx worker has permission to operate it + sudo chmod 777 /var/lib/nginx/body + nginx -c `pwd`/src/services/webdav/fixtures/nginx.conf + + - name: Test with redirect + shell: bash + working-directory: core + run: | + cargo test webdav -- --show-output + env: + RUST_BACKTRACE: full + RUST_LOG: debug + OPENDAL_WEBDAV_TEST: on + OPENDAL_WEBDAV_ENDPOINT: http://127.0.0.1:8081 diff --git a/Cargo.lock b/Cargo.lock index a7f40dc41051..097003bb903c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2674,6 +2674,7 @@ dependencies = [ "tracing", "tracing-opentelemetry", "tracing-subscriber", + "url", "uuid", "wiremock", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index 5521bedbf1cf..249707fed612 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -207,6 +207,7 @@ reqsign = { version = "0.12.0", default-features = false, optional = true } reqwest = { version = "0.11.13", features = [ "stream", ], default-features = false } +url = { version = "2.2" } # version should follow reqwest rocksdb = { version = "0.21.0", default-features = false, optional = true } serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index c6cb38a468a4..80a998d02863 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -21,14 +21,16 @@ use std::mem; use std::str::FromStr; use futures::TryStreamExt; -use http::Request; use http::Response; +use http::{header, Request, StatusCode}; +use log::debug; use reqwest::redirect::Policy; -use reqwest::Url; +use url::{ParseError, Url}; use super::body::IncomingAsyncBody; use super::parse_content_length; use super::AsyncBody; +use crate::raw::parse_location; use crate::Error; use crate::ErrorKind; use crate::Result; @@ -153,4 +155,169 @@ impl HttpClient { Ok(resp) } + + /// Send a request in async way with handling redirection logic. + /// Now we only support redirect GET request. + /// # Arguments + /// * `times` - how many times do you want to send request when we need to handle redirection + pub async fn send_with_redirect( + &self, + req: Request, + times: usize, + ) -> Result> { + if req.method() != http::Method::GET { + // for now we only handle redirection for GET request + // and please note that we don't support stream request either. + return Err(Error::new( + ErrorKind::Unsupported, + "redirect for unsupported HTTP method", + ) + .with_operation("http_util::Client::send_with_redirect_async") + .with_context("method", req.method().as_str())); + } + + let mut prev_req = self.clone_request(&req); + let mut prev_resp = self.send(req).await?; + let mut retries = 0; + + let resp = loop { + let status = prev_resp.status(); + // for now we only handle 302/308 for 3xx status + // notice that our redirect logic may not follow the HTTP standard + let should_redirect = match status { + StatusCode::FOUND => { + // theoretically we need to handle following status also: + // - StatusCode::MOVED_PERMANENTLY + // - StatusCode::SEE_OTHER + let mut new_req = self.clone_request(&prev_req); + for header in &[ + header::TRANSFER_ENCODING, + header::CONTENT_ENCODING, + header::CONTENT_TYPE, + header::CONTENT_LENGTH, + ] { + new_req.headers_mut().remove(header); + } + // see https://www.rfc-editor.org/rfc/rfc9110.html#section-15.4.2 + // theoretically for 301, 302 and 303 should change + // original http method to GET except HEAD + // even though we only support GET request for now, + // just in case we support other HTTP method in the future + // add method modification logic here + match new_req.method() { + &http::Method::GET | &http::Method::HEAD => {} + _ => *new_req.method_mut() = http::Method::GET, + } + Some(new_req) + } + // theoretically we need to handle following status also: + // - StatusCode::PERMANENT_REDIRECT + StatusCode::TEMPORARY_REDIRECT => Some(self.clone_request(&prev_req)), + _ => None, + }; + + retries += 1; + if retries > times || should_redirect.is_none() { + // exceeds maximum retry times or no need to redirect request + // just return last response + debug!("no need to redirect or reach the maximum retry times"); + break prev_resp; + } + debug!( + "it is the {} time for http client to retry. maximum times: {}", + retries, times + ); + + if let Some(mut redirect_req) = should_redirect { + let prev_url_str = redirect_req.uri().to_string(); + let prev_url = Url::parse(&prev_url_str).map_err(|e| { + Error::new(ErrorKind::Unexpected, "url is not valid") + .with_context("url", &prev_url_str) + .set_source(e) + })?; + + let loc = parse_location(prev_resp.headers())? + // no location means invalid redirect response + .ok_or_else(|| { + debug!( + "no location headers in response, url: {}, headers: {:?}", + &prev_url_str, + &prev_resp.headers() + ); + Error::new( + ErrorKind::Unexpected, + "no location header in redirect response", + ) + .with_context("method", redirect_req.method().as_str()) + .with_context("url", &prev_url_str) + })?; + + // one url with origin and path + let loc_url = Url::parse(loc).or_else(|err| { + match err { + ParseError::RelativeUrlWithoutBase => { + debug!("redirected location is relative url, will join it to original base url. loc: {}", loc); + let url = prev_url.clone().join(loc).map_err(|err| { + Error::new(ErrorKind::Unexpected, "invalid redirect base url and path") + .with_context("base", &prev_url_str) + .with_context("path", loc) + .set_source(err) + })?; + Ok(url) + } + err => { + Err( + Error::new(ErrorKind::Unexpected, "invalid location header") + .with_context("location", loc) + .set_source(err) + ) + } + } + })?; + + debug!("redirecting '{}' to '{}'", &prev_url_str, loc_url.as_str()); + self.remove_sensitive_headers(&mut redirect_req, &loc_url, &prev_url); + // change the request uri + *redirect_req.uri_mut() = loc_url.as_str().parse().map_err(|err| { + Error::new(ErrorKind::Unexpected, "new redirect url is invalid") + .with_context("loc", loc_url.as_str()) + .set_source(err) + })?; + prev_req = self.clone_request(&redirect_req); + prev_resp = self.send(redirect_req).await?; + } + }; + Ok(resp) + } +} + +impl HttpClient { + fn clone_request(&self, req: &Request) -> Request { + let (mut parts, body) = Request::new(match req.body() { + AsyncBody::Empty => AsyncBody::Empty, + AsyncBody::Bytes(bytes) => AsyncBody::Bytes(bytes.clone()), + }) + .into_parts(); + + // we just ignore extensions of request, because we won't use it + parts.method = req.method().clone(); + parts.uri = req.uri().clone(); + parts.version = req.version(); + parts.headers = req.headers().clone(); + + Request::from_parts(parts, body) + } + + fn remove_sensitive_headers(&self, req: &mut Request, next: &Url, previous: &Url) { + let cross_host = next.host_str() != previous.host_str() + || next.port_or_known_default() != previous.port_or_known_default(); + if cross_host { + let headers = req.headers_mut(); + headers.remove(header::AUTHORIZATION); + headers.remove(header::COOKIE); + headers.remove("cookie2"); + headers.remove(header::PROXY_AUTHORIZATION); + headers.remove(header::WWW_AUTHENTICATE); + } + } } diff --git a/core/src/services/webdav/backend.rs b/core/src/services/webdav/backend.rs index 6e98c50a3c5d..06450383148b 100644 --- a/core/src/services/webdav/backend.rs +++ b/core/src/services/webdav/backend.rs @@ -198,7 +198,7 @@ impl Builder for WebdavBuilder { Some(v) => v, None => { return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty") - .with_context("service", Scheme::Webdav)) + .with_context("service", Scheme::Webdav)); } }; @@ -300,9 +300,7 @@ impl Accessor for WebdavBackend { async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let resp = self.webdav_get(path, args.range()).await?; - let status = resp.status(); - match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => { let meta = parse_into_metadata(path, resp.headers())?; @@ -451,7 +449,6 @@ impl WebdavBackend { range: BytesRange, ) -> Result> { let p = build_rooted_abs_path(&self.root, path); - let url: String = format!("{}{}", self.endpoint, percent_encode_path(&p)); let mut req = Request::get(&url); @@ -468,7 +465,7 @@ impl WebdavBackend { .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - self.client.send(req).await + self.client.send_with_redirect(req, 5).await } pub async fn webdav_put( diff --git a/core/src/services/webdav/fixtures/nginx.conf b/core/src/services/webdav/fixtures/nginx.conf index 456fdf59af36..1ba753cfe12c 100644 --- a/core/src/services/webdav/fixtures/nginx.conf +++ b/core/src/services/webdav/fixtures/nginx.conf @@ -8,6 +8,23 @@ events { } http { + # the following configuration is for redirect test + server { + listen 127.0.0.1:8081; + server_name localhost; + access_log /tmp/forward-access.log; + error_log /tmp/forward-error.log; + + location / { + if ($request_method = GET) { + return 302 http://$server_name:8080$request_uri; + } + client_max_body_size 1024M; + # forward all other requests to port 8080 + proxy_pass http://127.0.0.1:8080; + } + } + server { listen 127.0.0.1:8080; server_name localhost;