From cbca2592185384aef4c87c01048e0406b6613fb0 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 2 Nov 2023 14:41:14 +0800 Subject: [PATCH 1/3] feat(bindings/python): Add layer API for operator Signed-off-by: Xuanwo --- bindings/python/python/opendal/__init__.pyi | 2 + bindings/python/python/opendal/layers.pyi | 6 --- bindings/python/src/asyncio.rs | 12 +++-- bindings/python/src/layers.rs | 51 ++++++-------------- bindings/python/src/lib.rs | 38 +++++---------- bindings/python/tests/conftest.py | 4 +- core/src/types/operator/blocking_operator.rs | 6 +++ 7 files changed, 48 insertions(+), 71 deletions(-) diff --git a/bindings/python/python/opendal/__init__.pyi b/bindings/python/python/opendal/__init__.pyi index df8134e3e9ab..4d7f8a7eec8a 100644 --- a/bindings/python/python/opendal/__init__.pyi +++ b/bindings/python/python/opendal/__init__.pyi @@ -21,6 +21,7 @@ class Error(Exception): ... class Operator: def __init__(self, scheme: str, **kwargs): ... + def layer(self, layer: Layer): ... def read(self, path: str) -> memoryview: ... def open_reader(self, path: str) -> Reader: ... def write( @@ -43,6 +44,7 @@ class Operator: class AsyncOperator: def __init__(self, scheme: str, **kwargs): ... + def layer(self, layer: Layer): ... async def read(self, path: str) -> memoryview: ... def open_reader(self, path: str) -> AsyncReader: ... async def write( diff --git a/bindings/python/python/opendal/layers.pyi b/bindings/python/python/opendal/layers.pyi index 6ae35f50ea53..bd2bc2e0231c 100644 --- a/bindings/python/python/opendal/layers.pyi +++ b/bindings/python/python/opendal/layers.pyi @@ -15,12 +15,6 @@ # specific language governing permissions and limitations # under the License. -class ConcurrentLimitLayer: - def __init__(self, permits: int) -> None: ... - -class ImmutableIndexLayer: - def insert(self, key: str) -> None: ... - class RetryLayer: def __init__( self, diff --git a/bindings/python/src/asyncio.rs b/bindings/python/src/asyncio.rs index 2ebd99dd260a..12c9eefee903 100644 --- a/bindings/python/src/asyncio.rs +++ b/bindings/python/src/asyncio.rs @@ -57,8 +57,8 @@ pub struct AsyncOperator(od::Operator); #[pymethods] impl AsyncOperator { #[new] - #[pyo3(signature = (scheme, *, layers=Vec::new(), **map))] - pub fn new(scheme: &str, layers: Vec, map: Option<&PyDict>) -> PyResult { + #[pyo3(signature = (scheme, *, **map))] + pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult { let scheme = od::Scheme::from_str(scheme) .map_err(|err| { od::Error::new(od::ErrorKind::Unexpected, "unsupported scheme").set_source(err) @@ -71,7 +71,13 @@ impl AsyncOperator { }) .unwrap_or_default(); - Ok(AsyncOperator(build_operator(scheme, map, layers, false)?)) + Ok(AsyncOperator(build_operator(scheme, map, false)?)) + } + + /// Add new layers upon existing operator + pub fn layer(&self, layer: &layers::Layer) -> PyResult { + let op = layer.0.layer(self.0.clone()); + Ok(Self(op)) } /// Read the whole path into bytes. diff --git a/bindings/python/src/layers.rs b/bindings/python/src/layers.rs index 1e7fe72f6af2..52b33d559021 100644 --- a/bindings/python/src/layers.rs +++ b/bindings/python/src/layers.rs @@ -18,47 +18,26 @@ use std::time::Duration; use ::opendal as od; +use opendal::Operator; use pyo3::prelude::*; -#[derive(FromPyObject)] -pub enum Layer { - ConcurrentLimit(ConcurrentLimitLayer), - ImmutableIndex(ImmutableIndexLayer), - Retry(RetryLayer), +pub trait PythonLayer: Send + Sync { + fn layer(&self, op: Operator) -> Operator; } -#[pyclass(module = "opendal.layers")] -#[derive(Clone)] -pub struct ConcurrentLimitLayer(pub od::layers::ConcurrentLimitLayer); +#[pyclass(module = "opendal.layers", subclass)] +pub struct Layer(pub Box); -#[pymethods] -impl ConcurrentLimitLayer { - #[new] - fn new(permits: usize) -> Self { - Self(od::layers::ConcurrentLimitLayer::new(permits)) - } -} - -#[pyclass(module = "opendal.layers")] +#[pyclass(module = "opendal.layers", extends=Layer)] #[derive(Clone)] -pub struct ImmutableIndexLayer(pub od::layers::ImmutableIndexLayer); - -#[pymethods] -impl ImmutableIndexLayer { - #[new] - fn new() -> Self { - Self(od::layers::ImmutableIndexLayer::default()) - } +pub struct RetryLayer(od::layers::RetryLayer); - fn insert(&mut self, key: String) { - self.0.insert(key); +impl PythonLayer for RetryLayer { + fn layer(&self, op: Operator) -> Operator { + op.layer(self.0.clone()) } } -#[pyclass(module = "opendal.layers")] -#[derive(Clone)] -pub struct RetryLayer(pub od::layers::RetryLayer); - #[pymethods] impl RetryLayer { #[new] @@ -75,7 +54,7 @@ impl RetryLayer { jitter: bool, max_delay: Option, min_delay: Option, - ) -> PyResult { + ) -> PyResult<(Self, Layer)> { let mut retry = od::layers::RetryLayer::default(); if let Some(max_times) = max_times { retry = retry.with_max_times(max_times); @@ -92,14 +71,16 @@ impl RetryLayer { if let Some(min_delay) = min_delay { retry = retry.with_min_delay(Duration::from_micros((min_delay * 1000000.0) as u64)); } - Ok(Self(retry)) + + let retry_layer = Self(retry); + + Ok((retry_layer.clone(), Layer(Box::new(retry_layer)))) } } pub fn create_submodule(py: Python) -> PyResult<&PyModule> { let submod = PyModule::new(py, "layers")?; - submod.add_class::()?; - submod.add_class::()?; + submod.add_class::()?; submod.add_class::()?; Ok(submod) } diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 8271c483a5cb..dee9db8f4a4b 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -82,25 +82,9 @@ impl From> for Buffer { } } -fn add_layers(mut op: od::Operator, layers: Vec) -> PyResult { - for layer in layers { - match layer { - layers::Layer::Retry(layers::RetryLayer(inner)) => op = op.layer(inner), - layers::Layer::ImmutableIndex(layers::ImmutableIndexLayer(inner)) => { - op = op.layer(inner) - } - layers::Layer::ConcurrentLimit(layers::ConcurrentLimitLayer(inner)) => { - op = op.layer(inner) - } - } - } - Ok(op) -} - fn build_operator( scheme: od::Scheme, map: HashMap, - layers: Vec, blocking: bool, ) -> PyResult { let mut op = od::Operator::via_map(scheme, map).map_err(format_pyerr)?; @@ -110,7 +94,7 @@ fn build_operator( op = op.layer(od::layers::BlockingLayer::create().expect("blocking layer must be created")); } - add_layers(op, layers) + Ok(op) } /// `Operator` is the entry for all public blocking APIs @@ -122,8 +106,8 @@ struct Operator(od::BlockingOperator); #[pymethods] impl Operator { #[new] - #[pyo3(signature = (scheme, *, layers=Vec::new(), **map))] - pub fn new(scheme: &str, layers: Vec, map: Option<&PyDict>) -> PyResult { + #[pyo3(signature = (scheme, *, **map))] + pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult { let scheme = od::Scheme::from_str(scheme) .map_err(|err| { od::Error::new(od::ErrorKind::Unexpected, "unsupported scheme").set_source(err) @@ -136,9 +120,13 @@ impl Operator { }) .unwrap_or_default(); - Ok(Operator( - build_operator(scheme, map, layers, true)?.blocking(), - )) + Ok(Operator(build_operator(scheme, map, true)?.blocking())) + } + + /// Add new layers upon existing operator + pub fn layer(&self, layer: &layers::Layer) -> PyResult { + let op = layer.0.layer(self.0.clone().into()); + Ok(Self(op.blocking())) } /// Read the whole path into bytes. @@ -585,11 +573,11 @@ fn _opendal(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add("Error", py.get_type::())?; - let layers = layers::create_submodule(py)?; - m.add_submodule(layers)?; + let layers_module = layers::create_submodule(py)?; + m.add_submodule(layers_module)?; py.import("sys")? .getattr("modules")? - .set_item("opendal.layers", layers)?; + .set_item("opendal.layers", layers_module)?; Ok(()) } diff --git a/bindings/python/tests/conftest.py b/bindings/python/tests/conftest.py index e4a88caa762f..bc831de6919c 100644 --- a/bindings/python/tests/conftest.py +++ b/bindings/python/tests/conftest.py @@ -56,12 +56,12 @@ def setup_config(service_name): @pytest.fixture() def operator(service_name, setup_config): - return opendal.Operator(service_name, **setup_config) + return opendal.Operator(service_name, **setup_config).layer(opendal.layers.RetryLayer()) @pytest.fixture() def async_operator(service_name, setup_config): - return opendal.AsyncOperator(service_name, **setup_config) + return opendal.AsyncOperator(service_name, **setup_config).layer(opendal.layers.RetryLayer()) @pytest.fixture(autouse=True) diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index f55a1c48a82c..e8b8572da28b 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -1140,3 +1140,9 @@ impl BlockingOperator { )) } } + +impl From for Operator { + fn from(v: BlockingOperator) -> Self { + Operator::from_inner(v.accessor).with_limit(v.limit) + } +} From 4f128a12f2e381b6834e12ffb81a294def87b5a7 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 2 Nov 2023 14:43:53 +0800 Subject: [PATCH 2/3] add upgrade Signed-off-by: Xuanwo --- bindings/python/upgrade.md | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 bindings/python/upgrade.md diff --git a/bindings/python/upgrade.md b/bindings/python/upgrade.md new file mode 100644 index 000000000000..91dc344807e2 --- /dev/null +++ b/bindings/python/upgrade.md @@ -0,0 +1,11 @@ +# Unreleased + +## Breaking change for layers + +Operator and BlockingOperator won't accept `layers` anymore. Instead, we provide a `layer` API: + +```python +op = opendal.Operator("memory").layer(opendal.layers.RetryLayer()) +``` + +We removed not used layers `ConcurrentLimitLayer` and `ImmutableIndexLayer` along with this change. From e82c681c00b662cb75de2c1bd3ec60993656fc48 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 2 Nov 2023 15:14:41 +0800 Subject: [PATCH 3/3] Use PyClassInitializer instead Signed-off-by: Xuanwo --- bindings/python/src/layers.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/bindings/python/src/layers.rs b/bindings/python/src/layers.rs index 52b33d559021..c3c9a8cc908f 100644 --- a/bindings/python/src/layers.rs +++ b/bindings/python/src/layers.rs @@ -54,7 +54,7 @@ impl RetryLayer { jitter: bool, max_delay: Option, min_delay: Option, - ) -> PyResult<(Self, Layer)> { + ) -> PyResult> { let mut retry = od::layers::RetryLayer::default(); if let Some(max_times) = max_times { retry = retry.with_max_times(max_times); @@ -73,8 +73,10 @@ impl RetryLayer { } let retry_layer = Self(retry); + let class = PyClassInitializer::from(Layer(Box::new(retry_layer.clone()))) + .add_subclass(retry_layer); - Ok((retry_layer.clone(), Layer(Box::new(retry_layer)))) + Ok(class) } }