Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bindings/python/python/opendal/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Error(Exception): ...

class Operator:
def __init__(self, scheme: str, **kwargs): ...
def layer(self, layer: Layer): ...
def read(self, path: str) -> memoryview: ...
def open_reader(self, path: str) -> Reader: ...
def write(
Expand All @@ -43,6 +44,7 @@ class Operator:

class AsyncOperator:
def __init__(self, scheme: str, **kwargs): ...
def layer(self, layer: Layer): ...
async def read(self, path: str) -> memoryview: ...
def open_reader(self, path: str) -> AsyncReader: ...
async def write(
Expand Down
6 changes: 0 additions & 6 deletions bindings/python/python/opendal/layers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@
# specific language governing permissions and limitations
# under the License.

class ConcurrentLimitLayer:
def __init__(self, permits: int) -> None: ...

class ImmutableIndexLayer:
def insert(self, key: str) -> None: ...

class RetryLayer:
def __init__(
self,
Expand Down
12 changes: 9 additions & 3 deletions bindings/python/src/asyncio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ pub struct AsyncOperator(od::Operator);
#[pymethods]
impl AsyncOperator {
#[new]
#[pyo3(signature = (scheme, *, layers=Vec::new(), **map))]
pub fn new(scheme: &str, layers: Vec<layers::Layer>, map: Option<&PyDict>) -> PyResult<Self> {
#[pyo3(signature = (scheme, *, **map))]
pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> {
let scheme = od::Scheme::from_str(scheme)
.map_err(|err| {
od::Error::new(od::ErrorKind::Unexpected, "unsupported scheme").set_source(err)
Expand All @@ -71,7 +71,13 @@ impl AsyncOperator {
})
.unwrap_or_default();

Ok(AsyncOperator(build_operator(scheme, map, layers, false)?))
Ok(AsyncOperator(build_operator(scheme, map, false)?))
}

/// Add new layers upon existing operator
pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
let op = layer.0.layer(self.0.clone());
Ok(Self(op))
}

/// Read the whole path into bytes.
Expand Down
53 changes: 18 additions & 35 deletions bindings/python/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,26 @@
use std::time::Duration;

use ::opendal as od;
use opendal::Operator;
use pyo3::prelude::*;

#[derive(FromPyObject)]
pub enum Layer {
ConcurrentLimit(ConcurrentLimitLayer),
ImmutableIndex(ImmutableIndexLayer),
Retry(RetryLayer),
pub trait PythonLayer: Send + Sync {
fn layer(&self, op: Operator) -> Operator;
}

#[pyclass(module = "opendal.layers")]
#[derive(Clone)]
pub struct ConcurrentLimitLayer(pub od::layers::ConcurrentLimitLayer);
#[pyclass(module = "opendal.layers", subclass)]
pub struct Layer(pub Box<dyn PythonLayer>);
Comment thread
messense marked this conversation as resolved.

#[pymethods]
impl ConcurrentLimitLayer {
#[new]
fn new(permits: usize) -> Self {
Self(od::layers::ConcurrentLimitLayer::new(permits))
}
}

#[pyclass(module = "opendal.layers")]
#[pyclass(module = "opendal.layers", extends=Layer)]
#[derive(Clone)]
pub struct ImmutableIndexLayer(pub od::layers::ImmutableIndexLayer);

#[pymethods]
impl ImmutableIndexLayer {
#[new]
fn new() -> Self {
Self(od::layers::ImmutableIndexLayer::default())
}
pub struct RetryLayer(od::layers::RetryLayer);

fn insert(&mut self, key: String) {
self.0.insert(key);
impl PythonLayer for RetryLayer {
fn layer(&self, op: Operator) -> Operator {
op.layer(self.0.clone())
}
}

#[pyclass(module = "opendal.layers")]
#[derive(Clone)]
pub struct RetryLayer(pub od::layers::RetryLayer);

#[pymethods]
impl RetryLayer {
#[new]
Expand All @@ -75,7 +54,7 @@ impl RetryLayer {
jitter: bool,
max_delay: Option<f64>,
min_delay: Option<f64>,
) -> PyResult<Self> {
) -> PyResult<PyClassInitializer<Self>> {
let mut retry = od::layers::RetryLayer::default();
if let Some(max_times) = max_times {
retry = retry.with_max_times(max_times);
Expand All @@ -92,14 +71,18 @@ impl RetryLayer {
if let Some(min_delay) = min_delay {
retry = retry.with_min_delay(Duration::from_micros((min_delay * 1000000.0) as u64));
}
Ok(Self(retry))

let retry_layer = Self(retry);
let class = PyClassInitializer::from(Layer(Box::new(retry_layer.clone())))
.add_subclass(retry_layer);

Ok(class)
}
}

pub fn create_submodule(py: Python) -> PyResult<&PyModule> {
let submod = PyModule::new(py, "layers")?;
submod.add_class::<ConcurrentLimitLayer>()?;
submod.add_class::<ImmutableIndexLayer>()?;
submod.add_class::<Layer>()?;
Comment thread
Xuanwo marked this conversation as resolved.
submod.add_class::<RetryLayer>()?;
Ok(submod)
}
38 changes: 13 additions & 25 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,9 @@ impl From<Vec<u8>> for Buffer {
}
}

fn add_layers(mut op: od::Operator, layers: Vec<layers::Layer>) -> PyResult<od::Operator> {
for layer in layers {
match layer {
layers::Layer::Retry(layers::RetryLayer(inner)) => op = op.layer(inner),
layers::Layer::ImmutableIndex(layers::ImmutableIndexLayer(inner)) => {
op = op.layer(inner)
}
layers::Layer::ConcurrentLimit(layers::ConcurrentLimitLayer(inner)) => {
op = op.layer(inner)
}
}
}
Ok(op)
}

fn build_operator(
scheme: od::Scheme,
map: HashMap<String, String>,
layers: Vec<layers::Layer>,
blocking: bool,
) -> PyResult<od::Operator> {
let mut op = od::Operator::via_map(scheme, map).map_err(format_pyerr)?;
Expand All @@ -110,7 +94,7 @@ fn build_operator(
op = op.layer(od::layers::BlockingLayer::create().expect("blocking layer must be created"));
}

add_layers(op, layers)
Ok(op)
}

/// `Operator` is the entry for all public blocking APIs
Expand All @@ -122,8 +106,8 @@ struct Operator(od::BlockingOperator);
#[pymethods]
impl Operator {
#[new]
#[pyo3(signature = (scheme, *, layers=Vec::new(), **map))]
pub fn new(scheme: &str, layers: Vec<layers::Layer>, map: Option<&PyDict>) -> PyResult<Self> {
#[pyo3(signature = (scheme, *, **map))]
pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> {
let scheme = od::Scheme::from_str(scheme)
.map_err(|err| {
od::Error::new(od::ErrorKind::Unexpected, "unsupported scheme").set_source(err)
Expand All @@ -136,9 +120,13 @@ impl Operator {
})
.unwrap_or_default();

Ok(Operator(
build_operator(scheme, map, layers, true)?.blocking(),
))
Ok(Operator(build_operator(scheme, map, true)?.blocking()))
}

/// Add new layers upon existing operator
pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
let op = layer.0.layer(self.0.clone().into());
Ok(Self(op.blocking()))
}

/// Read the whole path into bytes.
Expand Down Expand Up @@ -585,11 +573,11 @@ fn _opendal(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<capability::Capability>()?;
m.add("Error", py.get_type::<Error>())?;

let layers = layers::create_submodule(py)?;
m.add_submodule(layers)?;
let layers_module = layers::create_submodule(py)?;
m.add_submodule(layers_module)?;
py.import("sys")?
.getattr("modules")?
.set_item("opendal.layers", layers)?;
.set_item("opendal.layers", layers_module)?;

Ok(())
}
4 changes: 2 additions & 2 deletions bindings/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ def setup_config(service_name):

@pytest.fixture()
def operator(service_name, setup_config):
return opendal.Operator(service_name, **setup_config)
return opendal.Operator(service_name, **setup_config).layer(opendal.layers.RetryLayer())


@pytest.fixture()
def async_operator(service_name, setup_config):
return opendal.AsyncOperator(service_name, **setup_config)
return opendal.AsyncOperator(service_name, **setup_config).layer(opendal.layers.RetryLayer())


@pytest.fixture(autouse=True)
Expand Down
11 changes: 11 additions & 0 deletions bindings/python/upgrade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Unreleased

## Breaking change for layers

Operator and BlockingOperator won't accept `layers` anymore. Instead, we provide a `layer` API:

```python
op = opendal.Operator("memory").layer(opendal.layers.RetryLayer())
```

We removed not used layers `ConcurrentLimitLayer` and `ImmutableIndexLayer` along with this change.
6 changes: 6 additions & 0 deletions core/src/types/operator/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1140,3 +1140,9 @@ impl BlockingOperator {
))
}
}

impl From<BlockingOperator> for Operator {
fn from(v: BlockingOperator) -> Self {
Operator::from_inner(v.accessor).with_limit(v.limit)
}
}