Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 70 additions & 61 deletions crates/sqlite-libsql/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use anyhow::Context;
use async_trait::async_trait;
use spin_factor_sqlite::{Connection, QueryAsyncResult};
Expand All @@ -14,7 +12,7 @@ pub struct LazyLibSqlConnection {
// Since the libSQL client can only be created asynchronously, we wait until
// we're in the `Connection` implementation to create. Since we only want to do
// this once, we use a `OnceCell` to store it.
inner: OnceCell<Arc<LibSqlConnection>>,
inner: OnceCell<LibSqlConnection>,
}

impl LazyLibSqlConnection {
Expand All @@ -26,13 +24,12 @@ impl LazyLibSqlConnection {
}
}

pub async fn get_or_create_connection(&self) -> Result<&Arc<LibSqlConnection>, v3::Error> {
pub async fn get_or_create_connection(&self) -> Result<&LibSqlConnection, v3::Error> {
self.inner
.get_or_try_init(|| async {
LibSqlConnection::create(self.url.clone(), self.token.clone())
.await
.context("failed to create SQLite client")
.map(Arc::new)
})
.await
.map_err(|_| v3::Error::InvalidConnection)
Expand All @@ -57,7 +54,74 @@ impl Connection for LazyLibSqlConnection {
parameters: Vec<v3::Value>,
max_result_bytes: usize,
) -> Result<QueryAsyncResult, v3::Error> {
let client = self.get_or_create_connection().await?.clone();
self.get_or_create_connection()
.await?
.query_async(query, parameters, max_result_bytes)
.await
}

async fn execute_batch(&self, statements: &str) -> anyhow::Result<()> {
let client = self.get_or_create_connection().await?;
client.execute_batch(statements).await
}

async fn changes(&self) -> Result<u64, sqlite::Error> {
let client = self.get_or_create_connection().await?;
Ok(client.changes())
}

async fn last_insert_rowid(&self) -> Result<i64, sqlite::Error> {
let client = self.get_or_create_connection().await?;
Ok(client.last_insert_rowid())
}

fn summary(&self) -> Option<String> {
Some(format!("libSQL at {}", self.url))
}
}

/// An open connection to a libSQL server.
#[derive(Clone)]
pub struct LibSqlConnection {
inner: libsql::Connection,
}

impl LibSqlConnection {
pub async fn create(url: String, token: String) -> anyhow::Result<Self> {
let db = libsql::Builder::new_remote(url, token).build().await?;
let inner = db.connect()?;
Ok(Self { inner })
}
}

impl LibSqlConnection {
pub async fn query(
&self,
query: &str,
parameters: Vec<sqlite::Value>,
max_result_bytes: usize,
) -> Result<sqlite::QueryResult, sqlite::Error> {
let result = self
.inner
.query(query, convert_parameters(&parameters))
.await
.map_err(|e| sqlite::Error::Io(e.to_string()))?;

Ok(sqlite::QueryResult {
columns: columns(&result),
rows: convert_rows(result, max_result_bytes)
.await
.map_err(|e| sqlite::Error::Io(e.to_string()))?,
})
}

pub async fn query_async(
&self,
query: &str,
parameters: Vec<v3::Value>,
max_result_bytes: usize,
) -> Result<QueryAsyncResult, v3::Error> {
let client = self.clone();
let query = query.to_string();

let (cols_tx, cols_rx) = tokio::sync::oneshot::channel();
Expand Down Expand Up @@ -119,61 +183,6 @@ impl Connection for LazyLibSqlConnection {
})
}

async fn execute_batch(&self, statements: &str) -> anyhow::Result<()> {
let client = self.get_or_create_connection().await?;
client.execute_batch(statements).await
}

async fn changes(&self) -> Result<u64, sqlite::Error> {
let client = self.get_or_create_connection().await?;
Ok(client.changes())
}

async fn last_insert_rowid(&self) -> Result<i64, sqlite::Error> {
let client = self.get_or_create_connection().await?;
Ok(client.last_insert_rowid())
}

fn summary(&self) -> Option<String> {
Some(format!("libSQL at {}", self.url))
}
}

/// An open connection to a libSQL server.
#[derive(Clone)]
pub struct LibSqlConnection {
inner: libsql::Connection,
}

impl LibSqlConnection {
pub async fn create(url: String, token: String) -> anyhow::Result<Self> {
let db = libsql::Builder::new_remote(url, token).build().await?;
let inner = db.connect()?;
Ok(Self { inner })
}
}

impl LibSqlConnection {
pub async fn query(
&self,
query: &str,
parameters: Vec<sqlite::Value>,
max_result_bytes: usize,
) -> Result<sqlite::QueryResult, sqlite::Error> {
let result = self
.inner
.query(query, convert_parameters(&parameters))
.await
.map_err(|e| sqlite::Error::Io(e.to_string()))?;

Ok(sqlite::QueryResult {
columns: columns(&result),
rows: convert_rows(result, max_result_bytes)
.await
.map_err(|e| sqlite::Error::Io(e.to_string()))?,
})
}

pub async fn execute_batch(&self, statements: &str) -> anyhow::Result<()> {
self.inner.execute_batch(statements).await?;

Expand Down
Loading