Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 53 additions & 57 deletions core/src/services/sqlite/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,76 +204,72 @@ impl kv::Adapter for Adapter {
}

async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
let connection_string = self.connection_string.clone();
let value_field = self.value_field.clone();
let table = self.table.clone();
let key_field = self.key_field.clone();
let cloned_path = path.to_string();
let cloned_self = self.clone();

task::spawn_blocking(move || {
let query = format!(
"SELECT {} FROM {} WHERE `{}` = $1 LIMIT 1",
value_field, table, key_field
);
let conn = Connection::open(connection_string).map_err(Error::from)?;
let mut statement = conn.prepare(&query).map_err(Error::from)?;
let result = statement.query_row([cloned_path.as_str()], |row| row.get(0));
match result {
Ok(v) => Ok(Some(v)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(err) => Err(Error::from(err)),
}
})
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
task::spawn_blocking(move || cloned_self.blocking_get(cloned_path.as_str()))
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
}

fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
let query = format!(
"SELECT {} FROM {} WHERE `{}` = $1 LIMIT 1",
self.value_field, self.table, self.key_field
);
let conn = Connection::open(self.connection_string.clone()).map_err(Error::from)?;
let mut statement = conn.prepare(&query).map_err(Error::from)?;
let result = statement.query_row([path], |row| row.get(0));
match result {
Ok(v) => Ok(Some(v)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(err) => Err(Error::from(err)),
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
let connection_string = self.connection_string.clone();
let table = self.table.clone();
let key_field = self.key_field.clone();
let value_field = self.value_field.clone();
let cloned_path = path.to_string();
let cloned_value = value.to_vec();
let cloned_self = self.clone();

task::spawn_blocking(move || {
let query = format!(
"INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
table, key_field, value_field
);
let conn = Connection::open(connection_string).map_err(Error::from)?;
let mut statement = conn.prepare(&query).map_err(Error::from)?;
statement
.execute(params![cloned_path, cloned_value])
.map_err(Error::from)?;
Ok(())
})
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), &cloned_value))
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
let query = format!(
"INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
self.table, self.key_field, self.value_field
);
let conn = Connection::open(self.connection_string.clone()).map_err(Error::from)?;
let mut statement = conn.prepare(&query).map_err(Error::from)?;
statement
.execute(params![path, value])
.map_err(Error::from)?;
Ok(())
}

async fn delete(&self, path: &str) -> Result<()> {
let connection_string = self.connection_string.clone();
let table = self.table.clone();
let key_field = self.key_field.clone();
let cloned_path = path.to_string();
let cloned_self = self.clone();

task::spawn_blocking(move || {
let conn = Connection::open(connection_string).map_err(|err| {
Error::new(ErrorKind::Unexpected, "Sqlite open error").set_source(err)
})?;
let query = format!("DELETE FROM {} WHERE `{}` = $1", table, key_field);
let mut statement = conn.prepare(&query).map_err(Error::from)?;
statement
.execute([cloned_path.as_str()])
.map_err(Error::from)?;
Ok(())
})
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str()))
.await
.map_err(Error::from)
.and_then(|inner_result| inner_result)
}

fn blocking_delete(&self, path: &str) -> Result<()> {
let conn = Connection::open(self.connection_string.clone()).map_err(|err| {
Error::new(ErrorKind::Unexpected, "Sqlite open error").set_source(err)
})?;
let query = format!("DELETE FROM {} WHERE `{}` = $1", self.table, self.key_field);
let mut statement = conn.prepare(&query).map_err(Error::from)?;
statement.execute([path]).map_err(Error::from)?;
Ok(())
}
}

Expand Down