diff --git a/bindings/python/python/opendal/__init__.pyi b/bindings/python/python/opendal/__init__.pyi index df8134e3e9ab..4d7f8a7eec8a 100644 --- a/bindings/python/python/opendal/__init__.pyi +++ b/bindings/python/python/opendal/__init__.pyi @@ -21,6 +21,7 @@ class Error(Exception): ... class Operator: def __init__(self, scheme: str, **kwargs): ... + def layer(self, layer: Layer): ... def read(self, path: str) -> memoryview: ... def open_reader(self, path: str) -> Reader: ... def write( @@ -43,6 +44,7 @@ class Operator: class AsyncOperator: def __init__(self, scheme: str, **kwargs): ... + def layer(self, layer: Layer): ... async def read(self, path: str) -> memoryview: ... def open_reader(self, path: str) -> AsyncReader: ... async def write( diff --git a/bindings/python/python/opendal/layers.pyi b/bindings/python/python/opendal/layers.pyi index 6ae35f50ea53..bd2bc2e0231c 100644 --- a/bindings/python/python/opendal/layers.pyi +++ b/bindings/python/python/opendal/layers.pyi @@ -15,12 +15,6 @@ # specific language governing permissions and limitations # under the License. -class ConcurrentLimitLayer: - def __init__(self, permits: int) -> None: ... - -class ImmutableIndexLayer: - def insert(self, key: str) -> None: ... - class RetryLayer: def __init__( self, diff --git a/bindings/python/src/asyncio.rs b/bindings/python/src/asyncio.rs index 2ebd99dd260a..12c9eefee903 100644 --- a/bindings/python/src/asyncio.rs +++ b/bindings/python/src/asyncio.rs @@ -57,8 +57,8 @@ pub struct AsyncOperator(od::Operator); #[pymethods] impl AsyncOperator { #[new] - #[pyo3(signature = (scheme, *, layers=Vec::new(), **map))] - pub fn new(scheme: &str, layers: Vec, map: Option<&PyDict>) -> PyResult { + #[pyo3(signature = (scheme, *, **map))] + pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult { let scheme = od::Scheme::from_str(scheme) .map_err(|err| { od::Error::new(od::ErrorKind::Unexpected, "unsupported scheme").set_source(err) @@ -71,7 +71,13 @@ impl AsyncOperator { }) .unwrap_or_default(); - Ok(AsyncOperator(build_operator(scheme, map, layers, false)?)) + Ok(AsyncOperator(build_operator(scheme, map, false)?)) + } + + /// Add new layers upon existing operator + pub fn layer(&self, layer: &layers::Layer) -> PyResult { + let op = layer.0.layer(self.0.clone()); + Ok(Self(op)) } /// Read the whole path into bytes. diff --git a/bindings/python/src/layers.rs b/bindings/python/src/layers.rs index 1e7fe72f6af2..c3c9a8cc908f 100644 --- a/bindings/python/src/layers.rs +++ b/bindings/python/src/layers.rs @@ -18,47 +18,26 @@ use std::time::Duration; use ::opendal as od; +use opendal::Operator; use pyo3::prelude::*; -#[derive(FromPyObject)] -pub enum Layer { - ConcurrentLimit(ConcurrentLimitLayer), - ImmutableIndex(ImmutableIndexLayer), - Retry(RetryLayer), +pub trait PythonLayer: Send + Sync { + fn layer(&self, op: Operator) -> Operator; } -#[pyclass(module = "opendal.layers")] -#[derive(Clone)] -pub struct ConcurrentLimitLayer(pub od::layers::ConcurrentLimitLayer); +#[pyclass(module = "opendal.layers", subclass)] +pub struct Layer(pub Box); -#[pymethods] -impl ConcurrentLimitLayer { - #[new] - fn new(permits: usize) -> Self { - Self(od::layers::ConcurrentLimitLayer::new(permits)) - } -} - -#[pyclass(module = "opendal.layers")] +#[pyclass(module = "opendal.layers", extends=Layer)] #[derive(Clone)] -pub struct ImmutableIndexLayer(pub od::layers::ImmutableIndexLayer); - -#[pymethods] -impl ImmutableIndexLayer { - #[new] - fn new() -> Self { - Self(od::layers::ImmutableIndexLayer::default()) - } +pub struct RetryLayer(od::layers::RetryLayer); - fn insert(&mut self, key: String) { - self.0.insert(key); +impl PythonLayer for RetryLayer { + fn layer(&self, op: Operator) -> Operator { + op.layer(self.0.clone()) } } -#[pyclass(module = "opendal.layers")] -#[derive(Clone)] -pub struct RetryLayer(pub od::layers::RetryLayer); - #[pymethods] impl RetryLayer { #[new] @@ -75,7 +54,7 @@ impl RetryLayer { jitter: bool, max_delay: Option, min_delay: Option, - ) -> PyResult { + ) -> PyResult> { let mut retry = od::layers::RetryLayer::default(); if let Some(max_times) = max_times { retry = retry.with_max_times(max_times); @@ -92,14 +71,18 @@ impl RetryLayer { if let Some(min_delay) = min_delay { retry = retry.with_min_delay(Duration::from_micros((min_delay * 1000000.0) as u64)); } - Ok(Self(retry)) + + let retry_layer = Self(retry); + let class = PyClassInitializer::from(Layer(Box::new(retry_layer.clone()))) + .add_subclass(retry_layer); + + Ok(class) } } pub fn create_submodule(py: Python) -> PyResult<&PyModule> { let submod = PyModule::new(py, "layers")?; - submod.add_class::()?; - submod.add_class::()?; + submod.add_class::()?; submod.add_class::()?; Ok(submod) } diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 8271c483a5cb..dee9db8f4a4b 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -82,25 +82,9 @@ impl From> for Buffer { } } -fn add_layers(mut op: od::Operator, layers: Vec) -> PyResult { - for layer in layers { - match layer { - layers::Layer::Retry(layers::RetryLayer(inner)) => op = op.layer(inner), - layers::Layer::ImmutableIndex(layers::ImmutableIndexLayer(inner)) => { - op = op.layer(inner) - } - layers::Layer::ConcurrentLimit(layers::ConcurrentLimitLayer(inner)) => { - op = op.layer(inner) - } - } - } - Ok(op) -} - fn build_operator( scheme: od::Scheme, map: HashMap, - layers: Vec, blocking: bool, ) -> PyResult { let mut op = od::Operator::via_map(scheme, map).map_err(format_pyerr)?; @@ -110,7 +94,7 @@ fn build_operator( op = op.layer(od::layers::BlockingLayer::create().expect("blocking layer must be created")); } - add_layers(op, layers) + Ok(op) } /// `Operator` is the entry for all public blocking APIs @@ -122,8 +106,8 @@ struct Operator(od::BlockingOperator); #[pymethods] impl Operator { #[new] - #[pyo3(signature = (scheme, *, layers=Vec::new(), **map))] - pub fn new(scheme: &str, layers: Vec, map: Option<&PyDict>) -> PyResult { + #[pyo3(signature = (scheme, *, **map))] + pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult { let scheme = od::Scheme::from_str(scheme) .map_err(|err| { od::Error::new(od::ErrorKind::Unexpected, "unsupported scheme").set_source(err) @@ -136,9 +120,13 @@ impl Operator { }) .unwrap_or_default(); - Ok(Operator( - build_operator(scheme, map, layers, true)?.blocking(), - )) + Ok(Operator(build_operator(scheme, map, true)?.blocking())) + } + + /// Add new layers upon existing operator + pub fn layer(&self, layer: &layers::Layer) -> PyResult { + let op = layer.0.layer(self.0.clone().into()); + Ok(Self(op.blocking())) } /// Read the whole path into bytes. @@ -585,11 +573,11 @@ fn _opendal(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add("Error", py.get_type::())?; - let layers = layers::create_submodule(py)?; - m.add_submodule(layers)?; + let layers_module = layers::create_submodule(py)?; + m.add_submodule(layers_module)?; py.import("sys")? .getattr("modules")? - .set_item("opendal.layers", layers)?; + .set_item("opendal.layers", layers_module)?; Ok(()) } diff --git a/bindings/python/tests/conftest.py b/bindings/python/tests/conftest.py index e4a88caa762f..bc831de6919c 100644 --- a/bindings/python/tests/conftest.py +++ b/bindings/python/tests/conftest.py @@ -56,12 +56,12 @@ def setup_config(service_name): @pytest.fixture() def operator(service_name, setup_config): - return opendal.Operator(service_name, **setup_config) + return opendal.Operator(service_name, **setup_config).layer(opendal.layers.RetryLayer()) @pytest.fixture() def async_operator(service_name, setup_config): - return opendal.AsyncOperator(service_name, **setup_config) + return opendal.AsyncOperator(service_name, **setup_config).layer(opendal.layers.RetryLayer()) @pytest.fixture(autouse=True) diff --git a/bindings/python/upgrade.md b/bindings/python/upgrade.md new file mode 100644 index 000000000000..91dc344807e2 --- /dev/null +++ b/bindings/python/upgrade.md @@ -0,0 +1,11 @@ +# Unreleased + +## Breaking change for layers + +Operator and BlockingOperator won't accept `layers` anymore. Instead, we provide a `layer` API: + +```python +op = opendal.Operator("memory").layer(opendal.layers.RetryLayer()) +``` + +We removed not used layers `ConcurrentLimitLayer` and `ImmutableIndexLayer` along with this change. diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index f55a1c48a82c..e8b8572da28b 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -1140,3 +1140,9 @@ impl BlockingOperator { )) } } + +impl From for Operator { + fn from(v: BlockingOperator) -> Self { + Operator::from_inner(v.accessor).with_limit(v.limit) + } +}