-
Notifications
You must be signed in to change notification settings - Fork 116
[tx] Start implementing bulk SQL insertion algorithms #214
Changes from all commits
1088596
59c1e5a
d0c5761
7b77b63
11e19f3
5e01cfe
287986e
c53bb5a
9017ebc
07eda9c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,55 +12,203 @@ | |
|
|
||
| /// Low-level functions for testing. | ||
|
|
||
| use std::collections::{BTreeSet}; | ||
| use std::io::{Write}; | ||
|
|
||
| use itertools::Itertools; | ||
| use rusqlite; | ||
| use rusqlite::types::{ToSql}; | ||
| use tabwriter::TabWriter; | ||
|
|
||
| use {to_namespaced_keyword}; | ||
| use edn::types::{Value}; | ||
| use ::{to_namespaced_keyword}; | ||
| use bootstrap; | ||
| use edn; | ||
| use edn::symbols; | ||
| use entids; | ||
| use mentat_tx::entities::{Entid}; | ||
| use types::{DB, TypedValue}; | ||
| use errors::Result; | ||
|
|
||
| /// Represents an assertion (*datom*) in the store. | ||
| /// Represents a *datom* (assertion) in the store. | ||
| #[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] | ||
| pub struct Datom { | ||
| // TODO: generalize this. | ||
| e: Entid, | ||
| a: Entid, | ||
| v: Value, | ||
| tx: Option<i64>, | ||
| v: edn::Value, | ||
| tx: i64, | ||
| added: Option<bool>, | ||
| } | ||
|
|
||
| /// Represents a set of datoms (assertions) in the store. | ||
| pub struct Datoms(pub BTreeSet<Datom>); | ||
|
|
||
| /// Represents an ordered sequence of transactions in the store. | ||
| pub struct Transactions(pub Vec<Datoms>); | ||
|
|
||
| fn label_tx_id(tx: i64) -> edn::Value { | ||
| edn::Value::PlainSymbol(symbols::PlainSymbol::new(format!("?tx{}", tx - bootstrap::TX0))) | ||
| } | ||
|
|
||
| fn label_tx_instant(tx: i64) -> edn::Value { | ||
| edn::Value::PlainSymbol(symbols::PlainSymbol::new(format!("?ms{}", tx - bootstrap::TX0))) | ||
| } | ||
|
|
||
| impl Datom { | ||
| pub fn into_edn<T, U>(&self, tx_id: T, tx_instant: &U) -> edn::Value | ||
| where T: Fn(i64) -> edn::Value, U: Fn(i64) -> edn::Value { | ||
| let f = |entid: &Entid| -> edn::Value { | ||
| match *entid { | ||
| Entid::Entid(ref y) => edn::Value::Integer(y.clone()), | ||
| Entid::Ident(ref y) => edn::Value::NamespacedKeyword(y.clone()), | ||
| } | ||
| }; | ||
|
|
||
| // Rewrite [E :db/txInstant V] to [?txN :db/txInstant ?t0]. | ||
| let mut v = if self.a == Entid::Entid(entids::DB_TX_INSTANT) || self.a == Entid::Ident(to_namespaced_keyword(":db/txInstant").unwrap()) { | ||
| vec![tx_id(self.tx), | ||
| f(&self.a), | ||
| tx_instant(self.tx)] | ||
| } else { | ||
| vec![f(&self.e), f(&self.a), self.v.clone()] | ||
| }; | ||
| if let Some(added) = self.added { | ||
| v.push(tx_id(self.tx)); | ||
| v.push(edn::Value::Boolean(added)); | ||
| } | ||
|
|
||
| edn::Value::Vector(v) | ||
| } | ||
| } | ||
|
|
||
| /// Return the complete set of datoms in the store, ordered by (e, a, v). | ||
| pub fn datoms(conn: &rusqlite::Connection, db: &DB) -> Result<Vec<Datom>> { | ||
| // TODO: fewer magic numbers! | ||
| datoms_after(conn, db, &0x10000000) | ||
| impl Datoms { | ||
| pub fn into_edn_raw<T, U>(&self, tx_id: &T, tx_instant: &U) -> edn::Value | ||
| where T: Fn(i64) -> edn::Value, U: Fn(i64) -> edn::Value { | ||
| edn::Value::Set((&self.0).into_iter().map(|x| x.into_edn(tx_id, tx_instant)).collect()) | ||
| } | ||
|
|
||
| pub fn into_edn(&self) -> edn::Value { | ||
| self.into_edn_raw(&label_tx_id, &label_tx_instant) | ||
| } | ||
| } | ||
|
|
||
| impl Transactions { | ||
| pub fn into_edn_raw<T, U>(&self, tx_id: &T, tx_instant: &U) -> edn::Value | ||
| where T: Fn(i64) -> edn::Value, U: Fn(i64) -> edn::Value { | ||
| edn::Value::Vector((&self.0).into_iter().map(|x| x.into_edn_raw(tx_id, tx_instant)).collect()) | ||
| } | ||
|
|
||
| pub fn into_edn(&self) -> edn::Value { | ||
| self.into_edn_raw(&label_tx_id, &label_tx_instant) | ||
| } | ||
| } | ||
|
|
||
| /// Convert a numeric entid to an ident `Entid` if possible, otherwise a numeric `Entid`. | ||
| fn to_entid(db: &DB, entid: i64) -> Entid { | ||
| db.schema.get_ident(&entid).and_then(|ident| to_namespaced_keyword(&ident)).map_or(Entid::Entid(entid), Entid::Ident) | ||
| } | ||
|
|
||
| /// Return the set of datoms in the store with transaction ID strictly | ||
| /// greater than the given `tx`, ordered by (tx, e, a, v). | ||
| pub fn datoms_after(conn: &rusqlite::Connection, db: &DB, tx: &i32) -> Result<Vec<Datom>> { | ||
| let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag FROM datoms WHERE tx > ? ORDER BY tx, e, a, v")?; | ||
| /// Return the set of datoms in the store, ordered by (e, a, v, tx), but not including any datoms of | ||
| /// the form [... :db/txInstant ...]. | ||
| pub fn datoms(conn: &rusqlite::Connection, db: &DB) -> Result<Datoms> { | ||
| datoms_after(conn, db, bootstrap::TX0 - 1) | ||
| } | ||
|
|
||
| /// Return the set of datoms in the store with transaction ID strictly greater than the given `tx`, | ||
| /// ordered by (e, a, v, tx). | ||
| /// | ||
| /// The datom set returned does not include any datoms of the form [... :db/txInstant ...]. | ||
| pub fn datoms_after(conn: &rusqlite::Connection, db: &DB, tx: i64) -> Result<Datoms> { | ||
| let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx FROM datoms WHERE tx > ? ORDER BY e ASC, a ASC, v ASC, tx ASC")?; | ||
|
|
||
| let r: Result<Vec<_>> = stmt.query_and_then(&[&tx], |row| { | ||
| let e: i64 = row.get_checked(0)?; | ||
| let a: i64 = row.get_checked(1)?; | ||
|
|
||
| if a == entids::DB_TX_INSTANT { | ||
| return Ok(None); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. … so you don't need to do this.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It also occurs to me: we had better make sure that users never assign a new entity ID to any of the builtins!
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, although if you footgun yourself you footgun yourself :) I think the route to only letting the transactor allocate entids is via #190. |
||
| } | ||
|
|
||
| let v: rusqlite::types::Value = row.get_checked(2)?; | ||
| let value_type_tag: i32 = row.get_checked(3)?; | ||
|
|
||
| let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?; | ||
| let (value, _) = typed_value.to_edn_value_pair(); | ||
|
|
||
| let tx: i64 = row.get_checked(4)?; | ||
|
|
||
| Ok(Some(Datom { | ||
| e: to_entid(db, e), | ||
| a: to_entid(db, a), | ||
| v: value, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if it's worth keeping the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is true, but the real goal of these debugging routines is to compare to EDN, c.f. #188. With that in mind, I'm not going to expose |
||
| tx: tx, | ||
| added: None, | ||
| })) | ||
| })?.collect(); | ||
|
|
||
| Ok(Datoms(r?.into_iter().filter_map(|x| x).collect())) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel there must be a simpler alternative to
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There might be a simpler way, but I don't know it. Something has to |
||
| } | ||
|
|
||
| // Convert numeric entid to entity Entid. | ||
| let to_entid = |x| { | ||
| db.schema.get_ident(&x).and_then(|y| to_namespaced_keyword(&y)).map(Entid::Ident).unwrap_or(Entid::Entid(x)) | ||
| }; | ||
| /// Return the sequence of transactions in the store with transaction ID strictly greater than the | ||
| /// given `tx`, ordered by (tx, e, a, v). | ||
| /// | ||
| /// Each transaction returned includes the [:db/tx :db/txInstant ...] datom. | ||
| pub fn transactions_after(conn: &rusqlite::Connection, db: &DB, tx: i64) -> Result<Transactions> { | ||
| let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx, added FROM transactions WHERE tx > ? ORDER BY tx ASC, e ASC, a ASC, v ASC, added ASC")?; | ||
|
|
||
| let datoms = stmt.query_and_then(&[tx], |row| { | ||
| let r: Result<Vec<_>> = stmt.query_and_then(&[&tx], |row| { | ||
| let e: i64 = row.get_checked(0)?; | ||
| let a: i64 = row.get_checked(1)?; | ||
|
|
||
| let v: rusqlite::types::Value = row.get_checked(2)?; | ||
| let value_type_tag: i32 = row.get_checked(3)?; | ||
|
|
||
| let typed_value = TypedValue::from_sql_value_pair(v, &value_type_tag)?; | ||
| let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?; | ||
| let (value, _) = typed_value.to_edn_value_pair(); | ||
|
|
||
| let tx: i64 = row.get_checked(4)?; | ||
| let added: bool = row.get_checked(5)?; | ||
|
|
||
| Ok(Datom { | ||
| e: to_entid(e), | ||
| a: to_entid(a), | ||
| e: to_entid(db, e), | ||
| a: to_entid(db, a), | ||
| v: value, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment about |
||
| tx: None, | ||
| tx: tx, | ||
| added: Some(added), | ||
| }) | ||
| })?.collect(); | ||
| datoms | ||
|
|
||
| // Group by tx. | ||
| let r: Vec<Datoms> = r?.into_iter().group_by(|x| x.tx).into_iter().map(|(_key, group)| Datoms(group.collect())).collect(); | ||
| Ok(Transactions(r)) | ||
| } | ||
|
|
||
| /// Execute the given `sql` query with the given `params` and format the results as a | ||
| /// tab-and-newline formatted string suitable for debug printing. | ||
| /// | ||
| /// The query is printed followed by a newline, then the returned columns followed by a newline, and | ||
| /// then the data rows and columns. All columns are aligned. | ||
| pub fn dump_sql_query(conn: &rusqlite::Connection, sql: &str, params: &[&ToSql]) -> Result<String> { | ||
| let mut stmt: rusqlite::Statement = conn.prepare(sql)?; | ||
|
|
||
| let mut tw = TabWriter::new(Vec::new()).padding(2); | ||
| write!(&mut tw, "{}\n", sql).unwrap(); | ||
|
|
||
| for column_name in stmt.column_names() { | ||
| write!(&mut tw, "{}\t", column_name).unwrap(); | ||
| } | ||
| write!(&mut tw, "\n").unwrap(); | ||
|
|
||
| let r: Result<Vec<_>> = stmt.query_and_then(params, |row| { | ||
| for i in 0..row.column_count() { | ||
| let value: rusqlite::types::Value = row.get_checked(i)?; | ||
| write!(&mut tw, "{:?}\t", value).unwrap(); | ||
| } | ||
| write!(&mut tw, "\n").unwrap(); | ||
| Ok(()) | ||
| })?.collect(); | ||
| r?; | ||
|
|
||
| let dump = String::from_utf8(tw.into_inner().unwrap()).unwrap(); | ||
| Ok(dump) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,14 +10,21 @@ | |
|
|
||
| #[macro_use] | ||
| extern crate error_chain; | ||
| extern crate itertools; | ||
| #[macro_use] | ||
| extern crate lazy_static; | ||
| extern crate rusqlite; | ||
| extern crate time; | ||
|
|
||
| extern crate tabwriter; | ||
|
|
||
| extern crate edn; | ||
| extern crate mentat_tx; | ||
| extern crate mentat_tx_parser; | ||
|
|
||
| use itertools::Itertools; | ||
| use std::iter::repeat; | ||
|
|
||
| pub use errors::*; | ||
| pub use schema::*; | ||
| pub use types::*; | ||
|
|
@@ -33,6 +40,8 @@ mod values; | |
|
|
||
| use edn::symbols; | ||
|
|
||
| pub const SQLITE_MAX_VARIABLE_NUMBER: usize = 999; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a compile-time option in SQLite. We should upstream a patch to rusqlite.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that We can get this value at runtime by calling This is the correct thing to do. Using Filed https://github.com/jgallagher/rusqlite/issues/220 to expose Alternatively, we could consider looping over values and firing off a prepared statement over and over. I don't know which is slower: constructing the concatenated string and running one query with 999 variables, or running a small prepared statement 250 times within a transaction! |
||
|
|
||
| pub fn to_namespaced_keyword(s: &str) -> Option<symbols::NamespacedKeyword> { | ||
| let splits = [':', '/']; | ||
| let mut i = s.split(&splits[..]); | ||
|
|
@@ -41,3 +50,26 @@ pub fn to_namespaced_keyword(s: &str) -> Option<symbols::NamespacedKeyword> { | |
| _ => None | ||
| } | ||
| } | ||
|
|
||
| /// Prepare an SQL `VALUES` block, like (?, ?, ?), (?, ?, ?). | ||
| /// | ||
| /// The number of values per tuple determines `(?, ?, ?)`. The number of tuples determines `(...), (...)`. | ||
| /// | ||
| /// # Examples | ||
| /// | ||
| /// ```rust | ||
| /// # use mentat_db::{repeat_values}; | ||
| /// assert_eq!(repeat_values(1, 3), "(?), (?), (?)".to_string()); | ||
| /// assert_eq!(repeat_values(3, 1), "(?, ?, ?)".to_string()); | ||
| /// assert_eq!(repeat_values(2, 2), "(?, ?), (?, ?)".to_string()); | ||
| /// ``` | ||
| pub fn repeat_values(values_per_tuple: usize, tuples: usize) -> String { | ||
| assert!(values_per_tuple >= 1); | ||
| assert!(tuples >= 1); | ||
| assert!(values_per_tuple * tuples < SQLITE_MAX_VARIABLE_NUMBER, "Too many values: {} * {} >= {}", values_per_tuple, tuples, SQLITE_MAX_VARIABLE_NUMBER); | ||
| // Like "(?, ?, ?)". | ||
| let inner = format!("({})", repeat("?").take(values_per_tuple).join(", ")); | ||
| // Like "(?, ?, ?), (?, ?, ?)". | ||
| let values: String = repeat(inner).take(tuples).join(", "); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know it's a huge pain in the ass, but I'm really interested to see the tradeoffs between using a fixed static string and a single prepared statement called a few times, perhaps with a mutable values array — i.e., no allocations for each write — versus doing all of this allocation in Can you informally measure?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a ton involved in doing this, and it's not time yet. I've filed #263 to track this for real. This really shows when you try to import a Places database, which will be ... a while. |
||
| values | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WHERE tx > ? AND a IS NOT ?…", &[&tx, entids::DB_TX_INSTANT]? Might as well avoid all of those rows…There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this, and decided to do it as post-processing. There's other rewriting happening and I think the final form of these test functions might look a lot like a pattern matcher on
edn::Values, in which case I don't want to filter at all.This is really just a matter of taste, and I want to be as close to the actual DB contents in memory as I can be -- for now.