diff --git a/.env.example b/.env.example index 08b3c255eaa9..79211cb97a46 100644 --- a/.env.example +++ b/.env.example @@ -164,6 +164,14 @@ OPENDAL_GDRIVE_ACCESS_TOKEN= OPENDAL_GDRIVE_REFRESH_TOKEN= OPENDAL_GDRIVE_CLIENT_ID= OPENDAL_GDRIVE_CLIENT_SECRET= +# libsql +OPENDAL_LIBSQL_TEST=false +OPENDAL_LIBSQL_ROOT=/tmp/opendal/ +OPENDAL_LIBSQL_CONNECTION_STRING=https://example.com/db +OPENDAL_LIBSQL_AUTH_TOKEN= +OPENDAL_LIBSQL_TABLE=t_opendal +OPENDAL_LIBSQL_KEY_FIELD=key +OPENDAL_LIBSQL_VALUE_FIELD=val # sqlite OPENDAL_SQLITE_TEST=on OPENDAL_SQLITE_CONNECTION_STRING=file:///tmp/opendal/test.db diff --git a/.github/workflows/service_test_libsql.yml b/.github/workflows/service_test_libsql.yml new file mode 100644 index 000000000000..68ac2124c6ca --- /dev/null +++ b/.github/workflows/service_test_libsql.yml @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Service Test Libsql + +on: + push: + branches: + - main + pull_request: + branches: + - main + paths: + - "core/src/**" + - "core/tests/**" + - "!core/src/docs/**" + - "!core/src/services/**" + - "core/src/services/libsql/**" + - ".github/workflows/service_test_libsql.yml" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} + cancel-in-progress: true + +jobs: + libsql: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Setup libsql Server + shell: bash + working-directory: fixtures/libsql + run: docker-compose -f docker-compose.yml up -d + + - name: Setup Rust toolchain + uses: ./.github/actions/setup + with: + need-nextest: true + + - name: Create table + shell: bash + working-directory: core + run: | + curl --location '127.0.0.1:8080/v2/pipeline' \ + --header 'Content-Type: application/json' \ + --data '{ + "baton": null, + "requests": [ + { + "type": "execute", + "stmt": { + "sql": "CREATE TABLE IF NOT EXISTS `data` (`key` TEXT PRIMARY KEY NOT NULL CHECK(length(key) <= 255),`data` BLOB);", + "args": [], + "want_rows": true + } + } + ] + }' + + - name: Test + shell: bash + working-directory: core + run: cargo nextest run libsql --features services-libsql + env: + OPENDAL_LIBSQL_TEST: on + OPENDAL_LIBSQL_CONNECTION_STRING: http://127.0.0.1:8080 + OPENDAL_LIBSQL_TABLE: data + OPENDAL_LIBSQL_KEY_FIELD: key + OPENDAL_LIBSQL_VALUE_FIELD: data + + libsql-auth: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Setup libsql-auth Server + shell: bash + working-directory: fixtures/libsql + run: docker-compose -f docker-compose-auth.yml up -d + + - name: Setup Rust toolchain + uses: ./.github/actions/setup + with: + need-nextest: true + + - name: Create table + shell: bash + working-directory: core + run: | + curl --location '127.0.0.1:8080/v2/pipeline' \ + --header 'Content-Type: application/json' \ + --header 'Authorization: Bearer eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJleHAiOjc5ODg0ODM4Mjd9.MatB2aLnPFusagqH2RMoVExP37o2GFLmaJbmd52OdLtAehRNeqeJZPrefP1t2GBFidApUTLlaBRL6poKq_s3CQ' \ + --data '{ + "baton": null, + "requests": [ + { + "type": "execute", + "stmt": { + "sql": "CREATE TABLE IF NOT EXISTS `data` (`key` TEXT PRIMARY KEY NOT NULL CHECK(length(key) <= 255),`data` BLOB);", + "args": [], + "want_rows": true + } + } + ] + }' + + - name: Test + shell: bash + working-directory: core + run: cargo nextest run libsql --features services-libsql + env: + OPENDAL_LIBSQL_TEST: on + OPENDAL_LIBSQL_CONNECTION_STRING: http://127.0.0.1:8080 + OPENDAL_LIBSQL_AUTH_TOKEN: eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJleHAiOjc5ODg0ODM4Mjd9.MatB2aLnPFusagqH2RMoVExP37o2GFLmaJbmd52OdLtAehRNeqeJZPrefP1t2GBFidApUTLlaBRL6poKq_s3CQ + OPENDAL_LIBSQL_TABLE: data + OPENDAL_LIBSQL_KEY_FIELD: key + OPENDAL_LIBSQL_VALUE_FIELD: data diff --git a/Cargo.lock b/Cargo.lock index 601e740cdb81..d5f269a7b170 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -212,7 +212,7 @@ checksum = "840d2e9edec91ac974365978efc6f00781ff497e706a12306fff29ae92f8ad46" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -316,7 +316,7 @@ checksum = "0e97ce7de6cf12de5d7226c73f5ba9811622f4db3a5b91b55c53e987e5f91cba" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -364,7 +364,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -394,7 +394,7 @@ checksum = "7b2d0f03b3640e3a630367e40c468cb7f309529c708ed1d88597047b0e7c6ef7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -649,7 +649,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.23", + "syn 2.0.32", "which", ] @@ -1058,7 +1058,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -1383,7 +1383,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eed5fff0d93c7559121e9c72bf9c242295869396255071ff2cb1617147b608c5" dependencies = [ "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -1416,7 +1416,7 @@ dependencies = [ "proc-macro2", "quote", "scratch", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -1433,7 +1433,7 @@ checksum = "2fa16a70dd58129e4dfffdff535fb1bce66673f7bbeec4a5a1765a504e1ccd84" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -1481,7 +1481,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -1503,7 +1503,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core 0.20.3", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -1628,7 +1628,7 @@ checksum = "53e0efad4403bfc52dc201159c4b842a246a14b98c64b55dfd0f2d89729dfeb8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -2051,7 +2051,7 @@ checksum = "83c8d52fe8b46ab822b4decdcc0d6d85aeedfc98f0d52ba2bd4aec4a97807516" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", "try_map", ] @@ -2089,7 +2089,7 @@ checksum = "b0fa992f1656e1707946bbba340ad244f0814009ef8c0118eb7b658395f19a2e" dependencies = [ "frunk_proc_macro_helpers", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -2101,7 +2101,7 @@ dependencies = [ "frunk_core", "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -2113,7 +2113,7 @@ dependencies = [ "frunk_core", "frunk_proc_macro_helpers", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -2203,7 +2203,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -2504,6 +2504,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "hrana-client-proto" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f16b4e41e289da3fd60e64f245246a97e78fab7b3788c6d8147b3ae7d9f5e533" +dependencies = [ + "anyhow", + "base64 0.21.2", + "serde", + "serde_json", +] + [[package]] name = "htmlescape" version = "0.3.1" @@ -2912,7 +2924,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -3361,7 +3373,7 @@ checksum = "4901771e1d44ddb37964565c654a3223ba41a594d02b8da471cc4464912b5cfa" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -3543,7 +3555,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", "termcolor", "thiserror", ] @@ -4019,6 +4031,7 @@ dependencies = [ "futures", "governor", "hdrs", + "hrana-client-proto", "http", "hyper", "lazy-regex 3.0.1", @@ -4309,7 +4322,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -4701,7 +4714,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -4897,7 +4910,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9825a04601d60621feed79c4e6b56d65db77cdca55cef43b46b0de1096d1c282" dependencies = [ "proc-macro2", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -5019,7 +5032,7 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -6003,9 +6016,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.166" +version = "1.0.188" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d01b7404f9d441d3ad40e6a636a7782c377d2abdbe4fa2440e2edcc2f4f10db8" +checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" dependencies = [ "serde_derive", ] @@ -6021,13 +6034,13 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.166" +version = "1.0.188" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dd83d6dde2b6b2d466e14d9d1acce8816dedee94f735eac6395808b3483c6d6" +checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -6420,9 +6433,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.23" +version = "2.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59fb7d6d8281a51045d62b8eb3a7d1ce347b76f312af50cd3dc0af39c87c1737" +checksum = "239814284fd6f1a4ffe4ca893952cdd93c224b6a1571c9a9eadd670295c0c9e2" dependencies = [ "proc-macro2", "quote", @@ -6495,22 +6508,22 @@ checksum = "aac81b6fd6beb5884b0cf3321b8117e6e5d47ecb6fc89f414cfdcca8b2fe2dd8" [[package]] name = "thiserror" -version = "1.0.40" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" +checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.40" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" +checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -6705,7 +6718,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -6918,9 +6931,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.4.1" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8bd22a874a2d0b70452d5597b12c537331d49060824a95f49f108994f94aa4c" +checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ "bitflags 2.3.3", "bytes", @@ -6968,7 +6981,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", ] [[package]] @@ -7353,7 +7366,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", "wasm-bindgen-shared", ] @@ -7387,7 +7400,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.32", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index 5b1e7b5b53ad..c5ca1b20cd75 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -137,6 +137,7 @@ services-hdfs = ["dep:hdrs"] services-http = [] services-ipfs = ["dep:prost"] services-ipmfs = [] +services-libsql = ["dep:hrana-client-proto"] services-memcached = ["dep:bb8"] services-memory = [] services-mini-moka = ["dep:mini-moka"] @@ -228,6 +229,7 @@ governor = { version = "0.5", optional = true, features = ["std"] } hdrs = { version = "0.3.0", optional = true, features = ["async_file"] } http = "0.2.9" hyper = "0.14" +hrana-client-proto = { version = "0.2.1", optional = true } lazy-regex = { version = "3.0.1", optional = true } log = "0.4" madsim = { version = "0.2.21", optional = true } diff --git a/core/src/services/libsql/backend.rs b/core/src/services/libsql/backend.rs new file mode 100644 index 000000000000..db1291485e5e --- /dev/null +++ b/core/src/services/libsql/backend.rs @@ -0,0 +1,400 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::str; + +use async_trait::async_trait; +use bytes::Bytes; +use hrana_client_proto::pipeline::{ + ClientMsg, Response, ServerMsg, StreamExecuteReq, StreamExecuteResult, StreamRequest, + StreamResponse, StreamResponseError, StreamResponseOk, +}; +use hrana_client_proto::Error as PipelineError; +use hrana_client_proto::{Stmt, StmtResult, Value}; +use http::{Request, Uri}; + +use crate::raw::adapters::kv; +use crate::raw::*; +use crate::*; + +use super::error::parse_error; + +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct LibsqlBuilder { + connection_string: Option, + auth_token: Option, + + table: Option, + key_field: Option, + value_field: Option, + root: Option, +} + +impl Debug for LibsqlBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("LibsqlBuilder"); + ds.field("connection_string", &self.connection_string) + .field("table", &self.table) + .field("key_field", &self.key_field) + .field("value_field", &self.value_field) + .field("root", &self.root); + + if self.auth_token.is_some() { + ds.field("auth_token", &""); + } + ds.finish() + } +} + +impl LibsqlBuilder { + /// Set the connection_string of the libsql service. + /// + /// This connection string is used to connect to the libsql service. There are url based formats: + /// + /// ## Url + /// + /// This format resembles the url format of the libsql client. + /// + /// for a remote database connection: + /// + /// - `http://example.com/db` + /// - `https://example.com/db` + /// - `libsql://example.com/db` + pub fn connection_string(&mut self, v: &str) -> &mut Self { + if !v.is_empty() { + self.connection_string = Some(v.to_string()); + } + self + } + + /// set the authentication token for libsql service. + /// + /// default: no authentication token + pub fn auth_token(&mut self, auth_token: &str) -> &mut Self { + if !auth_token.is_empty() { + self.auth_token = Some(auth_token.to_owned()); + } + self + } + + /// set the working directory, all operations will be performed under it. + /// + /// default: "/" + pub fn root(&mut self, root: &str) -> &mut Self { + if !root.is_empty() { + self.root = Some(root.to_string()); + } + self + } + + /// Set the table name of the libsql service to read/write. + pub fn table(&mut self, table: &str) -> &mut Self { + if !table.is_empty() { + self.table = Some(table.to_string()); + } + self + } + + /// Set the key field name of the libsql service to read/write. + /// + /// Default to `key` if not specified. + pub fn key_field(&mut self, key_field: &str) -> &mut Self { + if !key_field.is_empty() { + self.key_field = Some(key_field.to_string()); + } + self + } + + /// Set the value field name of the libsql service to read/write. + /// + /// Default to `value` if not specified. + pub fn value_field(&mut self, value_field: &str) -> &mut Self { + if !value_field.is_empty() { + self.value_field = Some(value_field.to_string()); + } + self + } +} + +impl Builder for LibsqlBuilder { + const SCHEME: Scheme = Scheme::Libsql; + type Accessor = LibsqlBackend; + + fn from_map(map: HashMap) -> Self { + let mut builder = LibsqlBuilder::default(); + map.get("connection_string") + .map(|v| builder.connection_string(v)); + map.get("auth_token").map(|v| builder.auth_token(v)); + map.get("table").map(|v| builder.table(v)); + map.get("key_field").map(|v| builder.key_field(v)); + map.get("value_field").map(|v| builder.value_field(v)); + map.get("root").map(|v| builder.root(v)); + builder + } + + fn build(&mut self) -> Result { + let conn = self.get_connection_string()?; + + let table = match self.table.clone() { + Some(v) => v, + None => { + return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty") + .with_context("service", Scheme::Libsql)) + } + }; + let key_field = match self.key_field.clone() { + Some(v) => v, + None => "key".to_string(), + }; + let value_field = match self.value_field.clone() { + Some(v) => v, + None => "value".to_string(), + }; + let root = normalize_root( + self.root + .clone() + .unwrap_or_else(|| "/".to_string()) + .as_str(), + ); + + let client = HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::Libsql) + })?; + + Ok(LibsqlBackend::new(Adapter { + client, + connection_string: conn, + auth_token: self.auth_token.clone(), + table, + key_field, + value_field, + }) + .with_root(&root)) + } +} + +impl LibsqlBuilder { + fn get_connection_string(&self) -> Result { + let connection_string = self + .connection_string + .clone() + .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "connection_string is empty"))?; + + let ep_url = connection_string + .replace("libsql://", "https://") + .parse::() + .map_err(|e| { + Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid") + .with_context("service", Scheme::Libsql) + .with_context("connection_string", connection_string) + .set_source(e) + })?; + + match ep_url.scheme_str() { + None => Ok(format!("https://{ep_url}/")), + Some("http") | Some("https") => Ok(ep_url.to_string()), + Some(s) => Err( + Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme") + .with_context("service", Scheme::Libsql) + .with_context("scheme", s), + ), + } + } +} + +/// Backend for libsql service +pub type LibsqlBackend = kv::Backend; + +#[derive(Clone)] +pub struct Adapter { + client: HttpClient, + connection_string: String, + auth_token: Option, + + table: String, + key_field: String, + value_field: String, +} + +impl Debug for Adapter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("LibsqlAdapter"); + ds.field("connection_string", &self.connection_string) + .field("table", &self.table) + .field("key_field", &self.key_field) + .field("value_field", &self.value_field); + + if self.auth_token.is_some() { + ds.field("auth_token", &""); + } + + ds.finish() + } +} + +impl Adapter { + async fn execute(&self, sql: String, args: Vec) -> Result { + let url = format!("{}v2/pipeline", self.connection_string); + + let mut req = Request::post(&url); + + if let Some(auth_token) = self.auth_token.clone() { + req = req.header("Authorization", format!("Bearer {}", auth_token)); + } + + let msg = ClientMsg { + baton: None, + requests: vec![StreamRequest::Execute(StreamExecuteReq { + stmt: Stmt { + sql, + args, + named_args: vec![], + want_rows: true, + }, + })], + }; + let body = serde_json::to_string(&msg).map_err(|err| { + Error::new(ErrorKind::Unexpected, "failed to serialize request") + .with_context("service", Scheme::Libsql) + .set_source(err) + })?; + + let req = req + .body(AsyncBody::Bytes(Bytes::from(body))) + .map_err(new_request_build_error)?; + + let resp = self.client.send(req).await?; + + if resp.status() != http::StatusCode::OK { + return Err(parse_error(resp).await?); + } + + let bs = resp.into_body().bytes().await?; + + let resp: ServerMsg = serde_json::from_slice(&bs).map_err(|e| { + Error::new(ErrorKind::Unexpected, "deserialize json from response").set_source(e) + })?; + + if resp.results.is_empty() { + return Err(Error::new( + ErrorKind::Unexpected, + "Unexpected empty response from server", + )); + } + + if resp.results.len() > 1 { + return Err(Error::new( + ErrorKind::Unexpected, + "Unexpected multiple response from server", + )); + } + + Ok(resp) + } +} + +#[async_trait] +impl kv::Adapter for Adapter { + fn metadata(&self) -> kv::Metadata { + kv::Metadata::new( + Scheme::Libsql, + &self.table, + Capability { + read: true, + write: true, + create_dir: true, + delete: true, + ..Default::default() + }, + ) + } + + async fn get(&self, path: &str) -> Result>> { + let query = format!( + "SELECT {} FROM {} WHERE `{}` = ? LIMIT 1", + self.value_field, self.table, self.key_field + ); + let mut resp = self.execute(query, vec![Value::from(path)]).await?; + + match resp.results.swap_remove(0) { + Response::Ok(StreamResponseOk { + response: + StreamResponse::Execute(StreamExecuteResult { + result: StmtResult { cols: _, rows, .. }, + }), + }) => { + if rows.is_empty() || rows[0].is_empty() { + return Ok(None); + } else { + let val = &rows[0][0]; + match val { + Value::Null => Ok(None), + Value::Blob { value } => Ok(Some(value.to_owned())), + _ => Err(Error::new(ErrorKind::Unexpected, "invalid value type")), + } + } + } + Response::Ok(_) => Err(Error::new( + ErrorKind::Unexpected, + "Unexpected response from server", + )), + Response::Error(StreamResponseError { + error: PipelineError { message }, + }) => Err(Error::new( + ErrorKind::Unexpected, + format!("get failed: {}", message).as_str(), + )), + } + } + + async fn set(&self, path: &str, value: &[u8]) -> Result<()> { + let query = format!( + "INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES (?, ?)", + self.table, self.key_field, self.value_field + ); + let mut resp = self + .execute(query, vec![Value::from(path), Value::from(value.to_vec())]) + .await?; + match resp.results.swap_remove(0) { + Response::Ok(_) => Ok(()), + Response::Error(StreamResponseError { + error: PipelineError { message }, + }) => Err(Error::new( + ErrorKind::Unexpected, + format!("set failed: {}", message).as_str(), + )), + } + } + + async fn delete(&self, path: &str) -> Result<()> { + let query = format!("DELETE FROM {} WHERE `{}` = ?", self.table, self.key_field); + let mut resp = self.execute(query, vec![Value::from(path)]).await?; + match resp.results.swap_remove(0) { + Response::Ok(_) => Ok(()), + Response::Error(StreamResponseError { + error: PipelineError { message }, + }) => Err(Error::new( + ErrorKind::Unexpected, + format!("delete failed: {}", message).as_str(), + )), + } + } +} diff --git a/core/src/services/libsql/docs.md b/core/src/services/libsql/docs.md new file mode 100644 index 000000000000..00d0408c12c7 --- /dev/null +++ b/core/src/services/libsql/docs.md @@ -0,0 +1,50 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [ ] copy +- [ ] rename +- [ ] ~~list~~ +- [ ] scan +- [ ] ~~presign~~ +- [ ] blocking + +## Configuration + +- `root`: Set the working directory of `OpenDAL` +- `connection_string`: Set the connection string for libsql server +- `auth_token`: Set the authentication token for libsql server +- `table`: Set the table of libsql +- `key_field`: Set the key field of libsql +- `value_field`: Set the value field of libsql + +## Example + +### Via Builder + +```rust +use anyhow::Result; +use opendal::services::Libsql; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + let mut builder = Libsql::default(); + builder.root("/"); + builder.connection_string("https://example.com/db"); + builder.auth_token("secret"); + builder.table("your_table"); + // key field type in the table should be compatible with Rust's &str like text + builder.key_field("key"); + // value field type in the table should be compatible with Rust's Vec like bytea + builder.value_field("value"); + + let op = Operator::new(builder)?.finish(); + Ok(()) +} +``` diff --git a/core/src/services/libsql/error.rs b/core/src/services/libsql/error.rs new file mode 100644 index 000000000000..a2a80a02ec10 --- /dev/null +++ b/core/src/services/libsql/error.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use http::Response; +use http::StatusCode; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +/// Parse error response into Error. +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (kind, retryable) = match parts.status { + StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), + StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), + StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => { + (ErrorKind::ConditionNotMatch, false) + } + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let mut message = String::from_utf8_lossy(&bs).into_owned(); + + // If there is no body here, fill with http response code. + if message.is_empty() { + message = format!("Error response code: {}", parts.status); + } + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} diff --git a/core/src/services/libsql/mod.rs b/core/src/services/libsql/mod.rs new file mode 100644 index 000000000000..1b0d623c4e4a --- /dev/null +++ b/core/src/services/libsql/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::LibsqlBuilder as Libsql; + +mod error; diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 4e00c19fcff8..5293aaa443e9 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -84,6 +84,11 @@ mod ipmfs; #[cfg(feature = "services-ipmfs")] pub use ipmfs::Ipmfs; +#[cfg(feature = "services-libsql")] +mod libsql; +#[cfg(feature = "services-libsql")] +pub use libsql::Libsql; + #[cfg(feature = "services-memcached")] mod memcached; #[cfg(feature = "services-memcached")] diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 84480bf68b74..ec3298b7bd59 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -88,6 +88,8 @@ pub enum Scheme { Redis, /// [postgresql][crate::services::Postgresql]: Postgresql services Postgresql, + /// [libsql][crate::services::Libsql]: Libsql services + Libsql, /// [mysql][crate::services::Mysql]: Mysql services Mysql, /// [sqlite][crate::services::Sqlite]: Sqlite services @@ -169,6 +171,7 @@ impl FromStr for Scheme { "ftp" | "ftps" => Ok(Scheme::Ftp), "ipfs" | "ipns" => Ok(Scheme::Ipfs), "ipmfs" => Ok(Scheme::Ipmfs), + "libsql" => Ok(Scheme::Libsql), "memcached" => Ok(Scheme::Memcached), "memory" => Ok(Scheme::Memory), "mysql" => Ok(Scheme::Mysql), @@ -216,6 +219,7 @@ impl From for &'static str { Scheme::Ftp => "ftp", Scheme::Ipfs => "ipfs", Scheme::Ipmfs => "ipmfs", + Scheme::Libsql => "libsql", Scheme::Memcached => "memcached", Scheme::Memory => "memory", Scheme::MiniMoka => "mini_moka", diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs index f6a5e2df43ba..12f24e25f139 100644 --- a/core/tests/behavior/main.rs +++ b/core/tests/behavior/main.rs @@ -132,6 +132,8 @@ fn main() -> anyhow::Result<()> { tests.extend(behavior_test::()); #[cfg(feature = "services-ipmfs")] tests.extend(behavior_test::()); + #[cfg(feature = "services-libsql")] + tests.extend(behavior_test::()); #[cfg(feature = "services-memcached")] tests.extend(behavior_test::()); #[cfg(feature = "services-memory")] diff --git a/fixtures/libsql/docker-compose-auth.yml b/fixtures/libsql/docker-compose-auth.yml new file mode 100644 index 000000000000..d14214a0a597 --- /dev/null +++ b/fixtures/libsql/docker-compose-auth.yml @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: '3.8' + +services: + libsql: + image: ghcr.io/libsql/sqld:v0.21.9 + ports: + - '8080:8080' + environment: + - 'SQLD_MAX_RESPONSE_SIZE=20971520' + - 'SQLD_MAX_TOTAL_RESPONSE_SIZE=209715200' + - 'SQLD_AUTH_JWT_KEY=zaMv-aFGmB7PXkjM4IrMdF6B5zCYEiEGXW3RgMjNAtc' diff --git a/fixtures/libsql/docker-compose.yml b/fixtures/libsql/docker-compose.yml new file mode 100644 index 000000000000..77304afcc15f --- /dev/null +++ b/fixtures/libsql/docker-compose.yml @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: '3.8' + +services: + libsql: + image: ghcr.io/libsql/sqld:v0.21.9 + ports: + - '8080:8080' + environment: + - 'SQLD_MAX_RESPONSE_SIZE=20971520' + - 'SQLD_MAX_TOTAL_RESPONSE_SIZE=209715200'