From 0eaa822f4eae1bbe01c661a871becd222f02d535 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 8 Feb 2017 13:51:33 -0800 Subject: [PATCH 01/13] Pre: Implement batch [a v] pair lookup. --- db/src/db.rs | 80 ++++++++++++++++++++++++++++++++++++++++++++++++- db/src/types.rs | 13 +++++++- 2 files changed, 91 insertions(+), 2 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index 788c88984..d3f6dfe61 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -10,6 +10,7 @@ #![allow(dead_code)] +use std::collections::HashMap; use std::iter::once; use std::path::Path; @@ -35,7 +36,13 @@ use mentat_core::{ use mentat_tx::entities as entmod; use mentat_tx::entities::{Entity, OpType}; use errors::{ErrorKind, Result, ResultExt}; -use types::{DB, Partition, PartitionMap}; +use types::{ + AVMap, + AVPair, + DB, + Partition, + PartitionMap, +}; use schema::SchemaBuilding; pub fn new_connection(uri: T) -> rusqlite::Result where T: AsRef { @@ -463,6 +470,77 @@ impl DB { } } + /// Given a slice of [a v] lookup-refs, look up the corresponding [e a v] triples. + /// + /// It is assumed that the attribute `a` in each lookup-ref is `:db/unique`, so that at most one + /// matching [e a v] triple exists. (If this is not true, some matching entid `e` will be + /// chosen non-deterministically, if one exists.) + /// + /// Returns a map &(a, v) -> e, to avoid cloning potentially large values. The keys of the map + /// are exactly those (a, v) pairs that have an assertion [e a v] in the datom store. + pub fn resolve_avs<'a>(&self, conn: &rusqlite::Connection, avs: &'a [&'a AVPair]) -> Result> { + // Start search_id's at some identifiable number. + let initial_search_id = 2000; + let bindings_per_statement = 4; + + // We map [a v] -> numeric search_id -> e, and then we use the search_id lookups to finally + // produce the map [a v] -> e. + // + // TODO: `collect` into a HashSet so that any (a, v) is resolved at most once. + let chunks: itertools::IntoChunks<_> = avs.into_iter().enumerate().chunks(::SQLITE_MAX_VARIABLE_NUMBER / 4); + + // We'd like to `flat_map` here, but it's not obvious how to `flat_map` across `Result`. + // Alternatively, this is a `fold`, and it might be wise to express it as such. + let results: Result>> = chunks.into_iter().map(|chunk| -> Result> { + let mut count = 0; + + // We must keep these computed values somewhere to reference them later, so we can't + // combine this `map` and the subsequent `flat_map`. + let block: Vec<(i64, i64, ToSqlOutput<'a>, i32)> = chunk.map(|(index, &&(a, ref v))| { + count += 1; + let search_id: i64 = initial_search_id + index as i64; + let (value, value_type_tag) = v.to_sql_value_pair(); + (search_id, a, value, value_type_tag) + }).collect(); + + // `params` reference computed values in `block`. + let params: Vec<&ToSql> = block.iter().flat_map(|&(ref searchid, ref a, ref value, ref value_type_tag)| { + // Avoid inner heap allocation. + once(searchid as &ToSql) + .chain(once(a as &ToSql) + .chain(once(value as &ToSql) + .chain(once(value_type_tag as &ToSql)))) + }).collect(); + + // TODO: cache these statements for selected values of `count`. + // TODO: query against `datoms` and UNION ALL with `fulltext_datoms` rather than + // querying against `all_datoms`. We know all the attributes, and in the common case, + // where most unique attributes will not be fulltext-indexed, we'll be querying just + // `datoms`, which will be much faster. + let values: String = repeat_values(bindings_per_statement, count); + let s: String = format!("WITH t(search_id, a, v, value_type_tag) AS (VALUES {}) SELECT t.search_id, d.e \ + FROM t, all_datoms AS d \ + WHERE d.index_avet IS NOT 0 AND d.a = t.a AND d.value_type_tag = t.value_type_tag AND d.v = t.v", + values); + let mut stmt: rusqlite::Statement = conn.prepare(s.as_str())?; + + let m: Result> = stmt.query_and_then(¶ms, |row| -> Result<(i64, Entid)> { + Ok((row.get_checked(0)?, row.get_checked(1)?)) + })?.collect(); + m + }).collect::>>>(); + + // Flatten. + let results: Vec<(i64, Entid)> = results?.as_slice().concat(); + + // Create map [a v] -> e. + let m: HashMap<&'a AVPair, Entid> = results.into_iter().map(|(search_id, entid)| { + let index: usize = (search_id - initial_search_id) as usize; + (avs[index], entid) + }).collect(); + Ok(m) + } + /// Create empty temporary tables for search parameters and search results. fn create_temp_tables(&self, conn: &rusqlite::Connection) -> Result<()> { // We can't do this in one shot, since we can't prepare a batch statement. diff --git a/db/src/types.rs b/db/src/types.rs index dbc3823cc..4f58575a3 100644 --- a/db/src/types.rs +++ b/db/src/types.rs @@ -10,7 +10,8 @@ #![allow(dead_code)] -use std::collections::{BTreeMap}; +use std::collections::HashMap; +use std::collections::BTreeMap; extern crate mentat_core; @@ -64,3 +65,13 @@ impl DB { } } } + +/// A pair [a v] in the store. +/// +/// Used to represent lookup-refs and [TEMPID a v] upserts as they are resolved. +pub type AVPair = (Entid, TypedValue); + +/// Map [a v] pairs to existing entids. +/// +/// Used to resolve lookup-refs and upserts. +pub type AVMap<'a> = HashMap<&'a AVPair, Entid>; From e5e3779f55444497926d8bdaac6774ccb948c6fd Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 8 Feb 2017 17:28:34 -0800 Subject: [PATCH 02/13] Pre: Add InternSet for sharing ref-counted handles to large values. --- core/src/intern_set.rs | 45 ++++++++++++++++++++++++++++++++++++++++++ core/src/lib.rs | 2 ++ 2 files changed, 47 insertions(+) create mode 100644 core/src/intern_set.rs diff --git a/core/src/intern_set.rs b/core/src/intern_set.rs new file mode 100644 index 000000000..490a337b1 --- /dev/null +++ b/core/src/intern_set.rs @@ -0,0 +1,45 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +use std::collections::HashSet; +use std::hash::Hash; +use std::rc::Rc; + +/// An `InternSet` allows to "intern" some potentially large values, maintaining a single value +/// instance owned by the `InternSet` and leaving consumers with lightweight ref-counted handles to +/// the large owned value. This can avoid expensive clone() operations. +/// +/// In Mentat, such large values might be strings or arbitrary [a v] pairs. +/// +/// See https://en.wikipedia.org/wiki/String_interning for discussion. +#[derive(Clone, Debug, Default, Eq, PartialEq)] +pub struct InternSet where T: Eq + Hash { + pub inner: HashSet>, +} + +impl InternSet where T: Eq + Hash { + pub fn new() -> InternSet { + InternSet { + inner: HashSet::new(), + } + } + + /// Intern a value, providing a ref-counted handle to the interned value. + pub fn intern(&mut self, value: T) -> Rc { + let key = Rc::new(value); + if self.inner.insert(key.clone()) { + key + } else { + self.inner.get(&key).unwrap().clone() + } + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index c85b55340..7ec9162d8 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -272,3 +272,5 @@ mod test { assert!(attr2.flags() & AttributeBitFlags::UniqueValue as u8 != 0); } } + +pub mod intern_set; From 4fd776051a9a467318771d64f0f87e39b4d25889 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 8 Feb 2017 17:36:58 -0800 Subject: [PATCH 03/13] Pre: Derive more for Entity. --- tx/src/entities.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tx/src/entities.rs b/tx/src/entities.rs index c714cd3ad..ea6410edc 100644 --- a/tx/src/entities.rs +++ b/tx/src/entities.rs @@ -40,7 +40,7 @@ pub enum ValueOrLookupRef { LookupRef(LookupRef), } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)] pub enum OpType { Add, Retract, From 8ae2b4a9170a547572ba3ca2afb2f44724286ff1 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Thu, 2 Feb 2017 12:07:03 -0800 Subject: [PATCH 04/13] Pre: Return DB from creating; return TxReport from transact. I explicitly am not supporting opening existing databases yet, let alone upgrading databases from earlier versions. That can follow fast once basic transactions are supported. --- db/src/db.rs | 65 ++++++++++++++++++++++++------------------------- db/src/lib.rs | 8 ++++++ db/src/types.rs | 14 +++++++++++ 3 files changed, 54 insertions(+), 33 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index d3f6dfe61..28a19b7a7 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -18,9 +18,8 @@ use itertools; use itertools::Itertools; use rusqlite; use rusqlite::types::{ToSql, ToSqlOutput}; -use time; -use ::{repeat_values, to_namespaced_keyword}; +use ::{now, repeat_values, to_namespaced_keyword}; use bootstrap; use edn::types::Value; use entids; @@ -36,14 +35,15 @@ use mentat_core::{ use mentat_tx::entities as entmod; use mentat_tx::entities::{Entity, OpType}; use errors::{ErrorKind, Result, ResultExt}; +use schema::SchemaBuilding; use types::{ AVMap, AVPair, DB, Partition, PartitionMap, + TxReport, }; -use schema::SchemaBuilding; pub fn new_connection(uri: T) -> rusqlite::Result where T: AsRef { let conn = match uri.as_ref().to_string_lossy().len() { @@ -187,7 +187,7 @@ fn get_user_version(conn: &rusqlite::Connection) -> Result { } // TODO: rename "SQL" functions to align with "datoms" functions. -pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { +pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { let tx = conn.transaction()?; for statement in (&V2_STATEMENTS).iter() { @@ -204,14 +204,13 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { } let bootstrap_db = DB::new(bootstrap_partition_map, bootstrap::bootstrap_schema()); - bootstrap_db.transact_internal(&tx, &bootstrap::bootstrap_entities()[..], bootstrap::TX0)?; + bootstrap_db.transact_internal(&tx, &bootstrap::bootstrap_entities()[..], bootstrap::TX0, now())?; set_user_version(&tx, CURRENT_VERSION)?; - let user_version = get_user_version(&tx)?; // TODO: use the drop semantics to do this automagically? tx.commit()?; - Ok(user_version) + Ok(bootstrap_db) } // (def v2-statements v1-statements) @@ -312,12 +311,12 @@ pub fn update_from_version(conn: &mut rusqlite::Connection, current_version: i32 Ok(user_version) } -pub fn ensure_current_version(conn: &mut rusqlite::Connection) -> Result { +pub fn ensure_current_version(conn: &mut rusqlite::Connection) -> Result { let user_version = get_user_version(&conn)?; match user_version { - CURRENT_VERSION => Ok(user_version), 0 => create_current_version(conn), - v => update_from_version(conn, v), + // TODO: support updating or re-opening an existing store. + v => bail!(ErrorKind::NotYetImplemented(format!("Opening databases with Mentat version: {}", v))), } } @@ -782,7 +781,7 @@ impl DB { /// /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. // TODO: move this to the transactor layer. - pub fn transact_internal(&self, conn: &rusqlite::Connection, entities: &[Entity], tx: Entid) -> Result<()>{ + pub fn transact_internal(&self, conn: &rusqlite::Connection, entities: &[Entity], tx_id: Entid, tx_instant: i64) -> Result { // TODO: push these into an internal transaction report? /// Assertions that are :db.cardinality/one and not :db.fulltext. @@ -793,9 +792,7 @@ impl DB { // Transact [:db/add :db/txInstant NOW :db/tx]. // TODO: allow this to be present in the transaction data. - let now = time::get_time(); - let tx_instant = (now.sec as i64 * 1_000) + (now.nsec as i64 / (1_000_000)); - non_fts_one.push((tx, + non_fts_one.push((tx_id, entids::DB_TX_INSTANT, TypedValue::Long(tx_instant), true)); @@ -886,27 +883,31 @@ impl DB { self.create_temp_tables(conn)?; if !non_fts_one.is_empty() { - self.insert_non_fts_searches(conn, &non_fts_one[..], tx, SearchType::Inexact)?; + self.insert_non_fts_searches(conn, &non_fts_one[..], tx_id, SearchType::Inexact)?; } if !non_fts_many.is_empty() { - self.insert_non_fts_searches(conn, &non_fts_many[..], tx, SearchType::Exact)?; + self.insert_non_fts_searches(conn, &non_fts_many[..], tx_id, SearchType::Exact)?; } self.search(conn)?; - self.insert_transaction(conn, tx)?; - self.update_datoms(conn, tx)?; + self.insert_transaction(conn, tx_id)?; + self.update_datoms(conn, tx_id)?; // TODO: update parts, idents, schema materialized views. - Ok(()) + Ok(TxReport { + tx_id: tx_id, + tx_instant: tx_instant, + }) } } #[cfg(test)] mod tests { use super::*; + use {now}; use bootstrap; use debug; use edn; @@ -947,7 +948,7 @@ mod tests { /// There is some magic here about transaction numbering that I don't want to commit to or /// document just yet. The end state might be much more general pattern matching syntax, rather /// than the targeted transaction ID and timestamp replacement we have right now. - fn assert_transactions(conn: &rusqlite::Connection, db: &DB, transactions: &Vec) { + fn assert_transactions(conn: &rusqlite::Connection, db: &mut DB, transactions: &Vec) { for (index, transaction) in transactions.into_iter().enumerate() { let index = index as i64; let transaction = transaction.as_map().unwrap(); @@ -957,10 +958,12 @@ mod tests { let expected_datoms: Option<&edn::Value> = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "expected-datoms"))); let entities: Vec<_> = mentat_tx_parser::Tx::parse(&[assertions][..]).unwrap(); - db.transact_internal(&conn, &entities[..], bootstrap::TX0 + index + 1).unwrap(); + let _report = db.transact_internal(&conn, &entities[..], bootstrap::TX0 + index + 1, now()).unwrap(); + if let Some(expected_transaction) = expected_transaction { let transactions = debug::transactions_after(&conn, &db, bootstrap::TX0 + index).unwrap(); + assert_eq!(transactions.0.len(), 1); assert_eq!(transactions.0[0].into_edn(), *expected_transaction, "\n{} - expected transaction:\n{}\n{}", label, transactions.0[0].into_edn(), *expected_transaction); @@ -983,16 +986,14 @@ mod tests { #[test] fn test_add() { let mut conn = new_connection("").expect("Couldn't open in-memory db"); - assert_eq!(ensure_current_version(&mut conn).unwrap(), CURRENT_VERSION); - - let bootstrap_db = DB::new(bootstrap::bootstrap_partition_map(), bootstrap::bootstrap_schema()); + let mut db = ensure_current_version(&mut conn).unwrap(); // Does not include :db/txInstant. - let datoms = debug::datoms_after(&conn, &bootstrap_db, 0).unwrap(); + let datoms = debug::datoms_after(&conn, &db, 0).unwrap(); assert_eq!(datoms.0.len(), 88); // Includes :db/txInstant. - let transactions = debug::transactions_after(&conn, &bootstrap_db, 0).unwrap(); + let transactions = debug::transactions_after(&conn, &db, 0).unwrap(); assert_eq!(transactions.0.len(), 1); assert_eq!(transactions.0[0].0.len(), 89); @@ -1000,28 +1001,26 @@ mod tests { let value = edn::parse::value(include_str!("../../tx/fixtures/test_add.edn")).unwrap(); let transactions = value.as_vector().unwrap(); - assert_transactions(&conn, &bootstrap_db, transactions); + assert_transactions(&conn, &mut db, transactions); } #[test] fn test_retract() { let mut conn = new_connection("").expect("Couldn't open in-memory db"); - assert_eq!(ensure_current_version(&mut conn).unwrap(), CURRENT_VERSION); - - let bootstrap_db = DB::new(bootstrap::bootstrap_partition_map(), bootstrap::bootstrap_schema()); + let mut db = ensure_current_version(&mut conn).unwrap(); // Does not include :db/txInstant. - let datoms = debug::datoms_after(&conn, &bootstrap_db, 0).unwrap(); + let datoms = debug::datoms_after(&conn, &db, 0).unwrap(); assert_eq!(datoms.0.len(), 88); // Includes :db/txInstant. - let transactions = debug::transactions_after(&conn, &bootstrap_db, 0).unwrap(); + let transactions = debug::transactions_after(&conn, &db, 0).unwrap(); assert_eq!(transactions.0.len(), 1); assert_eq!(transactions.0[0].0.len(), 89); let value = edn::parse::value(include_str!("../../tx/fixtures/test_retract.edn")).unwrap(); let transactions = value.as_vector().unwrap(); - assert_transactions(&conn, &bootstrap_db, transactions); + assert_transactions(&conn, &mut db, transactions); } } diff --git a/db/src/lib.rs b/db/src/lib.rs index 463e5bd0c..099ab33e2 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -73,3 +73,11 @@ pub fn repeat_values(values_per_tuple: usize, tuples: usize) -> String { let values: String = repeat(inner).take(tuples).join(", "); values } + +/// Return the current time in milliseconds after the Unix epoch according to the local clock. +/// +/// Compare `Date.now()` in JavaScript, `System.currentTimeMillis` in Java. +pub fn now() -> i64 { + let now = time::get_time(); + (now.sec as i64 * 1_000) + (now.nsec as i64 / (1_000_000)) +} diff --git a/db/src/types.rs b/db/src/types.rs index 4f58575a3..4374d7661 100644 --- a/db/src/types.rs +++ b/db/src/types.rs @@ -75,3 +75,17 @@ pub type AVPair = (Entid, TypedValue); /// /// Used to resolve lookup-refs and upserts. pub type AVMap<'a> = HashMap<&'a AVPair, Entid>; + +/// A transaction report summarizes an applied transaction. +// TODO: include map of resolved tempids. +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialOrd, PartialEq)] +pub struct TxReport { + /// The transaction ID of the transaction. + pub tx_id: Entid, + + /// The timestamp when the transaction was commited. + /// + /// This is milliseconds after the Unix epoch according to the transactor's local clock. + // TODO: :db.type/instant. + pub tx_instant: i64, +} From 8fe071e3a24022d61fda0fe161d5bba932a05045 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Thu, 2 Feb 2017 15:27:16 -0800 Subject: [PATCH 05/13] Pre: Parse string temporary ID entities; remove ValueOrLookupRef. This adds TempId entities, but we can't disambiguate String temporary IDs from values without the use of the schema, so there's no new value branch. Similarly, we can't disambiguate lookup-ref values from two element list values without a schema, so we remove this entirely. We'll handle the ambiguity later in the transactor. --- db/src/db.rs | 8 ++++---- tx-parser/src/lib.rs | 38 ++++++++++++++++++++++---------------- tx-parser/tests/parser.rs | 26 ++++++++++++++++---------- tx/src/entities.rs | 9 +++++---- 4 files changed, 47 insertions(+), 34 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index 28a19b7a7..7dba4e388 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -804,9 +804,9 @@ impl DB { match *entity { Entity::AddOrRetract { op: OpType::Add, - e: entmod::EntidOrLookupRef::Entid(ref e_), + e: entmod::EntidOrLookupRefOrTempId::Entid(ref e_), a: ref a_, - v: entmod::ValueOrLookupRef::Value(ref v_)} => { + v: ref v_ } => { let e: i64 = match e_ { &entmod::Entid::Entid(ref e__) => *e__, @@ -839,9 +839,9 @@ impl DB { Entity::AddOrRetract { op: OpType::Retract, - e: entmod::EntidOrLookupRef::Entid(ref e_), + e: entmod::EntidOrLookupRefOrTempId::Entid(ref e_), a: ref a_, - v: entmod::ValueOrLookupRef::Value(ref v_) } => { + v: ref v_ } => { let e: i64 = match e_ { &entmod::Entid::Entid(ref e__) => *e__, diff --git a/tx-parser/src/lib.rs b/tx-parser/src/lib.rs index 598f7eeff..1fc96375a 100644 --- a/tx-parser/src/lib.rs +++ b/tx-parser/src/lib.rs @@ -21,7 +21,7 @@ use combine::{any, eof, many, parser, satisfy_map, token, Parser, ParseResult, S use combine::combinator::{Expected, FnParser}; use edn::symbols::NamespacedKeyword; use edn::types::Value; -use mentat_tx::entities::{Entid, EntidOrLookupRef, Entity, LookupRef, OpType, ValueOrLookupRef}; +use mentat_tx::entities::{Entid, EntidOrLookupRefOrTempId, Entity, LookupRef, OpType}; use mentat_parser_utils::ResultParser; pub struct Tx(::std::marker::PhantomData I>); @@ -64,20 +64,25 @@ def_parser_fn!(Tx, lookup_ref, Value, LookupRef, input, { .parse_stream(input) }); -def_parser_fn!(Tx, entid_or_lookup_ref, Value, EntidOrLookupRef, input, { - Tx::::entid() - .map(|x| EntidOrLookupRef::Entid(x)) - .or(Tx::::lookup_ref().map(|x| EntidOrLookupRef::LookupRef(x))) +def_parser_fn!(Tx, entid_or_lookup_ref_or_temp_id, Value, EntidOrLookupRefOrTempId, input, { + Tx::::entid().map(|x| EntidOrLookupRefOrTempId::Entid(x)) + .or(Tx::::lookup_ref().map(|x| EntidOrLookupRefOrTempId::LookupRef(x))) + .or(Tx::::temp_id().map(|x| EntidOrLookupRefOrTempId::TempId(x))) .parse_lazy(input) .into() }); +def_parser_fn!(Tx, temp_id, Value, String, input, { + satisfy_map(|x: Value| x.into_text()) + .parse_stream(input) +}); + // TODO: abstract the "match Vector, parse internal stream" pattern to remove this boilerplate. def_parser_fn!(Tx, add, Value, Entity, input, { satisfy_map(|x: Value| -> Option { if let Value::Vector(y) = x { let mut p = (token(Value::NamespacedKeyword(NamespacedKeyword::new("db", "add"))), - Tx::<&[Value]>::entid_or_lookup_ref(), + Tx::<&[Value]>::entid_or_lookup_ref_or_temp_id(), Tx::<&[Value]>::entid(), // TODO: handle lookup-ref. any(), @@ -87,7 +92,7 @@ def_parser_fn!(Tx, add, Value, Entity, input, { op: OpType::Add, e: e, a: a, - v: ValueOrLookupRef::Value(v), + v: v, } }); // TODO: use ok() with a type annotation rather than explicit match. @@ -106,7 +111,7 @@ def_parser_fn!(Tx, retract, Value, Entity, input, { satisfy_map(|x: Value| -> Option { if let Value::Vector(y) = x { let mut p = (token(Value::NamespacedKeyword(NamespacedKeyword::new("db", "retract"))), - Tx::<&[Value]>::entid_or_lookup_ref(), + Tx::<&[Value]>::entid_or_lookup_ref_or_temp_id(), Tx::<&[Value]>::entid(), // TODO: handle lookup-ref. any(), @@ -116,7 +121,7 @@ def_parser_fn!(Tx, retract, Value, Entity, input, { op: OpType::Retract, e: e, a: a, - v: ValueOrLookupRef::Value(v), + v: v, } }); // TODO: use ok() with a type annotation rather than explicit match. @@ -170,6 +175,7 @@ mod tests { use combine::Parser; use edn::symbols::NamespacedKeyword; use edn::types::Value; + use mentat_tx::entities::{Entid, EntidOrLookupRefOrTempId, Entity, LookupRef, OpType}; fn kw(namespace: &str, name: &str) -> Value { Value::NamespacedKeyword(NamespacedKeyword::new(namespace, name)) @@ -186,10 +192,10 @@ mod tests { assert_eq!(result, Ok((Entity::AddOrRetract { op: OpType::Add, - e: EntidOrLookupRef::Entid(Entid::Ident(NamespacedKeyword::new("test", - "entid"))), + e: EntidOrLookupRefOrTempId::Entid(Entid::Ident(NamespacedKeyword::new("test", + "entid"))), a: Entid::Ident(NamespacedKeyword::new("test", "a")), - v: ValueOrLookupRef::Value(Value::Text("v".into())), + v: Value::Text("v".into()), }, &[][..]))); } @@ -205,9 +211,9 @@ mod tests { assert_eq!(result, Ok((Entity::AddOrRetract { op: OpType::Retract, - e: EntidOrLookupRef::Entid(Entid::Entid(101)), + e: EntidOrLookupRefOrTempId::Entid(Entid::Entid(101)), a: Entid::Ident(NamespacedKeyword::new("test", "a")), - v: ValueOrLookupRef::Value(Value::Text("v".into())), + v: Value::Text("v".into()), }, &[][..]))); } @@ -224,12 +230,12 @@ mod tests { assert_eq!(result, Ok((Entity::AddOrRetract { op: OpType::Add, - e: EntidOrLookupRef::LookupRef(LookupRef { + e: EntidOrLookupRefOrTempId::LookupRef(LookupRef { a: Entid::Ident(NamespacedKeyword::new("test", "a1")), v: Value::Text("v1".into()), }), a: Entid::Ident(NamespacedKeyword::new("test", "a")), - v: ValueOrLookupRef::Value(Value::Text("v".into())), + v: Value::Text("v".into()), }, &[][..]))); } diff --git a/tx-parser/tests/parser.rs b/tx-parser/tests/parser.rs index 66d29e43d..f0a7add4a 100644 --- a/tx-parser/tests/parser.rs +++ b/tx-parser/tests/parser.rs @@ -16,15 +16,15 @@ extern crate mentat_tx_parser; use edn::parse; use edn::symbols::NamespacedKeyword; use edn::types::Value; -use mentat_tx::entities::{Entid, EntidOrLookupRef, Entity, OpType, ValueOrLookupRef}; +use mentat_tx::entities::{Entid, EntidOrLookupRefOrTempId, Entity, OpType}; use mentat_tx_parser::Tx; #[test] fn test_entities() { - - // TODO: align with whitespace after the EDN parser ignores more whitespace. - let input = r#"[[:db/add 101 :test/a "v"] -[:db/retract 102 :test/b "w"]]"#; + let input = r#" +[[:db/add 101 :test/a "v"] + [:db/add "tempid" :test/a "v"] + [:db/retract 102 :test/b "w"]]"#; let edn = parse::value(input).unwrap(); let input = [edn]; @@ -33,16 +33,22 @@ fn test_entities() { assert_eq!(result, Ok(vec![ Entity::AddOrRetract { - e: EntidOrLookupRef::Entid(Entid::Entid(101)), + op: OpType::Add, + e: EntidOrLookupRefOrTempId::Entid(Entid::Entid(101)), a: Entid::Ident(NamespacedKeyword::new("test", "a")), - v: ValueOrLookupRef::Value(Value::Text("v".into())), + v: Value::Text("v".into()), + }, + Entity::AddOrRetract { op: OpType::Add, + e: EntidOrLookupRefOrTempId::TempId("tempid".into()), + a: Entid::Ident(NamespacedKeyword::new("test", "a")), + v: Value::Text("v".into()), }, Entity::AddOrRetract { - e: EntidOrLookupRef::Entid(Entid::Entid(102)), - a: Entid::Ident(NamespacedKeyword::new("test", "b")), - v: ValueOrLookupRef::Value(Value::Text("w".into())), op: OpType::Retract, + e: EntidOrLookupRefOrTempId::Entid(Entid::Entid(102)), + a: Entid::Ident(NamespacedKeyword::new("test", "b")), + v: Value::Text("w".into()), }, ])); } diff --git a/tx/src/entities.rs b/tx/src/entities.rs index ea6410edc..04a6126f7 100644 --- a/tx/src/entities.rs +++ b/tx/src/entities.rs @@ -35,9 +35,10 @@ pub enum EntidOrLookupRef { } #[derive(Clone, Debug, PartialEq)] -pub enum ValueOrLookupRef { - Value(Value), +pub enum EntidOrLookupRefOrTempId { + Entid(Entid), LookupRef(LookupRef), + TempId(String), } #[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)] @@ -50,8 +51,8 @@ pub enum OpType { pub enum Entity { AddOrRetract { op: OpType, - e: EntidOrLookupRef, + e: EntidOrLookupRefOrTempId, a: Entid, - v: ValueOrLookupRef, + v: Value, }, } From 73eead0c537384a23a311cb76ff9cdae5d2282dd Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Thu, 2 Feb 2017 12:19:00 -0800 Subject: [PATCH 06/13] Persist partitions to SQL store; allocate transaction ID. (#186) --- db/src/db.rs | 78 +++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 71 insertions(+), 7 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index 7dba4e388..352f1b668 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -11,7 +11,8 @@ #![allow(dead_code)] use std::collections::HashMap; -use std::iter::once; +use std::iter::{once, repeat}; +use std::ops::Range; use std::path::Path; use itertools; @@ -160,6 +161,7 @@ lazy_static! { r#"CREATE TABLE schema (ident TEXT NOT NULL, attr TEXT NOT NULL, value BLOB NOT NULL, value_type_tag SMALLINT NOT NULL, FOREIGN KEY (ident) REFERENCES idents (ident))"#, r#"CREATE INDEX idx_schema_unique ON schema (ident, attr, value, value_type_tag)"#, + // TODO: store entid instead of ident for partition name. r#"CREATE TABLE parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, idx INTEGER NOT NULL)"#, ] }; @@ -198,13 +200,15 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { // TODO: think more carefully about allocating new parts and bitmasking part ranges. // TODO: install these using bootstrap assertions. It's tricky because the part ranges are implicit. // TODO: one insert, chunk into 999/3 sections, for safety. + // This is necessary: `transact` will only UPDATE parts, not INSERT them if they're missing. for (part, partition) in bootstrap_partition_map.iter() { // TODO: Convert "keyword" part to SQL using Value conversion. tx.execute("INSERT INTO parts VALUES (?, ?, ?)", &[part, &partition.start, &partition.index])?; } - let bootstrap_db = DB::new(bootstrap_partition_map, bootstrap::bootstrap_schema()); - bootstrap_db.transact_internal(&tx, &bootstrap::bootstrap_entities()[..], bootstrap::TX0, now())?; + // TODO: return to transact_internal to self manage the encompassing SQLite transaction. + let mut bootstrap_db = DB::new(bootstrap_partition_map, bootstrap::bootstrap_schema()); + bootstrap_db.transact(&tx, &bootstrap::bootstrap_entities()[..])?; set_user_version(&tx, CURRENT_VERSION)?; @@ -776,12 +780,71 @@ impl DB { Ok(()) } + /// Update the current partition map materialized view. + // TODO: only update changed partitions. + fn update_partition_map(&self, conn: &rusqlite::Connection) -> Result<()> { + let values_per_statement = 2; + let max_partitions = ::SQLITE_MAX_VARIABLE_NUMBER / values_per_statement; + if self.partition_map.len() > max_partitions { + bail!(ErrorKind::NotYetImplemented(format!("No more than {} partitions are supported", max_partitions))); + } + + // Like "UPDATE parts SET idx = CASE WHEN part = ? THEN ? WHEN part = ? THEN ? ELSE idx END". + let s = format!("UPDATE parts SET idx = CASE {} ELSE idx END", + repeat("WHEN part = ? THEN ?").take(self.partition_map.len()).join(" ")); + + let params: Vec<&ToSql> = self.partition_map.iter().flat_map(|(name, partition)| { + once(name as &ToSql) + .chain(once(&partition.index as &ToSql)) + }).collect(); + + // TODO: only cache the latest of these statements. Changing the set of partitions isn't + // supported in the Clojure implementationat all, and might not be supported in Mentat soon, + // so this is very low priority. + let mut stmt = conn.prepare_cached(s.as_str())?; + stmt.execute(¶ms[..]) + .map(|_c| ()) + .chain_err(|| "Could not update partition map") + } + + /// Allocate a single fresh entid in the given `partition`. + fn allocate_entid(&mut self, partition: String) -> i64 { + self.allocate_entids(partition, 1).start + } + + /// Allocate `n` fresh entids in the given `partition`. + fn allocate_entids(&mut self, partition: String, n: usize) -> Range { + match self.partition_map.get_mut(&partition) { + Some(mut partition) => { + let idx = partition.index; + partition.index += n as i64; + idx..partition.index + }, + // This is a programming error. + None => panic!("Cannot allocate entid from unknown partition: {}", partition), + } + } + + /// Transact the given `entities` against the given SQLite `conn`, using the metadata in + /// `self.DB`. + /// + /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. + // TODO: move this to the transactor layer. + pub fn transact(&mut self, conn: &rusqlite::Connection, entities: &[Entity]) -> Result { + let tx_instant = now(); // Label the transaction with the timestamp when we first see it: leading edge. + let tx = self.allocate_entid(":db.part/tx".to_string()); + + // Eventually, this will be responsible for managing a SQLite transaction. For now, it's + // just about the tx details. + self.transact_internal(conn, entities, tx, tx_instant) + } + /// Transact the given `entities` against the given SQLite `conn`, using the metadata in /// `self.DB`. /// /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. // TODO: move this to the transactor layer. - pub fn transact_internal(&self, conn: &rusqlite::Connection, entities: &[Entity], tx_id: Entid, tx_instant: i64) -> Result { + pub fn transact_internal(&mut self, conn: &rusqlite::Connection, entities: &[Entity], tx_id: Entid, tx_instant: i64) -> Result { // TODO: push these into an internal transaction report? /// Assertions that are :db.cardinality/one and not :db.fulltext. @@ -895,7 +958,8 @@ impl DB { self.insert_transaction(conn, tx_id)?; self.update_datoms(conn, tx_id)?; - // TODO: update parts, idents, schema materialized views. + // TODO: update idents and schema materialized views. + self.update_partition_map(conn)?; Ok(TxReport { tx_id: tx_id, @@ -907,7 +971,6 @@ impl DB { #[cfg(test)] mod tests { use super::*; - use {now}; use bootstrap; use debug; use edn; @@ -958,8 +1021,9 @@ mod tests { let expected_datoms: Option<&edn::Value> = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "expected-datoms"))); let entities: Vec<_> = mentat_tx_parser::Tx::parse(&[assertions][..]).unwrap(); - let _report = db.transact_internal(&conn, &entities[..], bootstrap::TX0 + index + 1, now()).unwrap(); + let report = db.transact(&conn, &entities[..]).unwrap(); + assert_eq!(report.tx_id, bootstrap::TX0 + index + 1); if let Some(expected_transaction) = expected_transaction { let transactions = debug::transactions_after(&conn, &db, bootstrap::TX0 + index).unwrap(); From 2ca0fa5fa407bdb7b7366d90cf3aa5089f33c717 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Thu, 9 Feb 2017 21:19:12 -0800 Subject: [PATCH 07/13] Implement tempid upsert resolution algorithm. (#184) --- db/src/db.rs | 237 ++++++++++++++++++++++---------- db/src/internal_types.rs | 85 ++++++++++++ db/src/lib.rs | 2 + db/src/upsert_resolution.rs | 265 ++++++++++++++++++++++++++++++++++++ 4 files changed, 518 insertions(+), 71 deletions(-) create mode 100644 db/src/internal_types.rs create mode 100644 db/src/upsert_resolution.rs diff --git a/db/src/db.rs b/db/src/db.rs index 352f1b668..d769de90b 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -10,7 +10,8 @@ #![allow(dead_code)] -use std::collections::HashMap; +use std; +use std::collections::{BTreeSet, HashMap}; use std::iter::{once, repeat}; use std::ops::Range; use std::path::Path; @@ -33,6 +34,7 @@ use mentat_core::{ TypedValue, ValueType, }; +use mentat_core::intern_set; use mentat_tx::entities as entmod; use mentat_tx::entities::{Entity, OpType}; use errors::{ErrorKind, Result, ResultExt}; @@ -45,6 +47,17 @@ use types::{ PartitionMap, TxReport, }; +use internal_types::{ + replace_lookup_ref, + LookupRefOrTempId, + TempId, + TempIdMap, + Term, + TermWithTempIds, + TermWithTempIdsAndLookupRefs, + TermWithoutTempIds, +}; +use upsert_resolution::Generation; pub fn new_connection(uri: T) -> rusqlite::Result where T: AsRef { let conn = match uri.as_ref().to_string_lossy().len() { @@ -208,7 +221,7 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { // TODO: return to transact_internal to self manage the encompassing SQLite transaction. let mut bootstrap_db = DB::new(bootstrap_partition_map, bootstrap::bootstrap_schema()); - bootstrap_db.transact(&tx, &bootstrap::bootstrap_entities()[..])?; + bootstrap_db.transact(&tx, bootstrap::bootstrap_entities())?; set_user_version(&tx, CURRENT_VERSION)?; @@ -544,6 +557,38 @@ impl DB { Ok(m) } + /// TODO: explain what this is doing. + pub fn resolve_temp_id_avs<'a>(&self, conn: &rusqlite::Connection, temp_id_avs: &'a [(TempId, AVPair)]) -> Result { + if temp_id_avs.is_empty() { + return Ok(TempIdMap::default()); + } + + // Map [a v]->entid. + let mut av_pairs: Vec<&AVPair> = vec![]; + for i in 0..temp_id_avs.len() { + av_pairs.push(&temp_id_avs[i].1); + } + + // Lookup in the store. + let av_map: AVMap = self.resolve_avs(conn, &av_pairs[..])?; + + // Map id->entid. + let mut temp_id_map: TempIdMap = TempIdMap::default(); + for &(ref temp_id, ref av_pair) in temp_id_avs { + if let Some(n) = av_map.get(&av_pair) { + if let Some(previous_n) = temp_id_map.get(&*temp_id) { + if n != previous_n { + // Conflicting upsert! TODO: collect conflicts and give more details on what failed this transaction. + bail!(ErrorKind::NotYetImplemented(format!("Conflicting upsert: tempid '{}' resolves to more than one entid: {:?}, {:?}", temp_id, previous_n, n))) // XXX + } + } + temp_id_map.insert(temp_id.clone(), *n); + } + } + + Ok((temp_id_map)) + } + /// Create empty temporary tables for search parameters and search results. fn create_temp_tables(&self, conn: &rusqlite::Connection) -> Result<()> { // We can't do this in one shot, since we can't prepare a batch statement. @@ -825,12 +870,87 @@ impl DB { } } + /// Pipeline stage 1: convert `Entity` instances into `Term` instances, ready for term + /// rewriting. + /// + /// The `Term` instances produce share interned TempId and LookupRef handles. + fn entities_into_terms_with_temp_ids_and_lookup_refs(&self, entities: I) -> Result> where I: IntoIterator { + let mut temp_ids = intern_set::InternSet::new(); + + entities.into_iter() + .map(|entity: Entity| -> Result { + match entity { + Entity::AddOrRetract { op, e, a, v } => { + let a: i64 = match a { + entmod::Entid::Entid(ref a) => *a, + entmod::Entid::Ident(ref a) => self.schema.require_entid(&a.to_string())?, + }; + + let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?; + + let e = match e { + entmod::EntidOrLookupRefOrTempId::Entid(e) => { + let e: i64 = match e { + entmod::Entid::Entid(ref e) => *e, + entmod::Entid::Ident(ref e) => self.schema.require_entid(&e.to_string())?, + }; + std::result::Result::Ok(e) + }, + + entmod::EntidOrLookupRefOrTempId::TempId(e) => { + std::result::Result::Err(LookupRefOrTempId::TempId(temp_ids.intern(e))) + }, + + entmod::EntidOrLookupRefOrTempId::LookupRef(_) => { + // TODO: reference entity and initial input. + bail!(ErrorKind::NotYetImplemented(format!("Transacting lookup-refs is not yet implemented"))) + }, + }; + + let v = { + if attribute.value_type == ValueType::Ref && v.is_text() { + std::result::Result::Err(LookupRefOrTempId::TempId(temp_ids.intern(v.as_text().unwrap().clone()))) + } else if attribute.value_type == ValueType::Ref && v.is_vector() && v.as_vector().unwrap().len() == 2 { + bail!(ErrorKind::NotYetImplemented(format!("Transacting lookup-refs is not yet implemented"))) + } else { + // Here is where we do schema-aware typechecking: we either assert that + // the given value is in the attribute's value set, or (in limited + // cases) coerce the value into the attribute's value set. + let typed_value: TypedValue = self.to_typed_value(&v, &attribute)?; + + std::result::Result::Ok(typed_value) + } + }; + + Ok(Term::AddOrRetract(op, e, a, v)) + }, + } + }) + .collect::>>() + } + + /// Pipeline stage 2: rewrite `Term` instances with lookup refs into `Term` instances without + /// lookup refs. + /// + /// The `Term` instances produce share interned TempId handles and have no LookupRef references. + fn resolve_lookup_refs(&self, lookup_ref_map: &AVMap, terms: I) -> Result> where I: IntoIterator { + terms.into_iter().map(|term: TermWithTempIdsAndLookupRefs| -> Result { + match term { + Term::AddOrRetract(op, e, a, v) => { + let e = replace_lookup_ref(&lookup_ref_map, e, |x| x)?; + let v = replace_lookup_ref(&lookup_ref_map, v, |x| TypedValue::Ref(x))?; + Ok(Term::AddOrRetract(op, e, a, v)) + }, + } + }).collect::>>() + } + /// Transact the given `entities` against the given SQLite `conn`, using the metadata in /// `self.DB`. /// /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. // TODO: move this to the transactor layer. - pub fn transact(&mut self, conn: &rusqlite::Connection, entities: &[Entity]) -> Result { + pub fn transact(&mut self, conn: &rusqlite::Connection, entities: I) -> Result where I: IntoIterator { let tx_instant = now(); // Label the transaction with the timestamp when we first see it: leading edge. let tx = self.allocate_entid(":db.part/tx".to_string()); @@ -844,7 +964,7 @@ impl DB { /// /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. // TODO: move this to the transactor layer. - pub fn transact_internal(&mut self, conn: &rusqlite::Connection, entities: &[Entity], tx_id: Entid, tx_instant: i64) -> Result { + pub fn transact_internal(&mut self, conn: &rusqlite::Connection, entities: I, tx_id: Entid, tx_instant: i64) -> Result where I: IntoIterator { // TODO: push these into an internal transaction report? /// Assertions that are :db.cardinality/one and not :db.fulltext. @@ -860,88 +980,63 @@ impl DB { TypedValue::Long(tx_instant), true)); - // Right now, this could be a for loop, saving some mapping, collecting, and type - // annotations. However, I expect it to be a multi-stage map as we start to transform the - // underlying entities, in which case this expression is more natural than for loops. - let r: Vec> = entities.into_iter().map(|entity: &Entity| -> Result<()> { - match *entity { - Entity::AddOrRetract { - op: OpType::Add, - e: entmod::EntidOrLookupRefOrTempId::Entid(ref e_), - a: ref a_, - v: ref v_ } => { - - let e: i64 = match e_ { - &entmod::Entid::Entid(ref e__) => *e__, - &entmod::Entid::Ident(ref e__) => self.schema.require_entid(&e__.to_string())?, - }; - - let a: i64 = match a_ { - &entmod::Entid::Entid(ref a__) => *a__, - &entmod::Entid::Ident(ref a__) => self.schema.require_entid(&a__.to_string())?, - }; + // We don't yet support lookup refs, so this isn't mutable. Later, it'll be mutable. + let lookup_refs: intern_set::InternSet = intern_set::InternSet::new(); - let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?; - if attribute.fulltext { - bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented: {:?}", entity))) - } + // TODO: extract the tempids set as well. + // Pipeline stage 1: entities -> terms with tempids and lookup refs. + let terms_with_temp_ids_and_lookup_refs = self.entities_into_terms_with_temp_ids_and_lookup_refs(entities)?; - // This is our chance to do schema-aware typechecking: to either assert that the - // given value is in the attribute's value set, or (in limited cases) to coerce - // the value into the attribute's value set. - let typed_value: TypedValue = self.to_typed_value(v_, &attribute)?; + // Pipeline stage 2: resolve lookup refs -> terms with tempids. + let lookup_ref_avs: Vec<&(i64, TypedValue)> = lookup_refs.inner.iter().map(|rc| &**rc).collect(); + let lookup_ref_map: AVMap = self.resolve_avs(conn, &lookup_ref_avs[..])?; - let added = true; - if attribute.multival { - non_fts_many.push((e, a, typed_value, added)); - } else { - non_fts_one.push((e, a, typed_value, added)); - } - Ok(()) - }, + let terms_with_temp_ids = self.resolve_lookup_refs(&lookup_ref_map, terms_with_temp_ids_and_lookup_refs)?; + + // Pipeline stage 3: upsert tempids -> terms without tempids or lookup refs. + // Now we can collect upsert populations. + let (mut generation, inert_terms) = Generation::from(terms_with_temp_ids, self)?; - Entity::AddOrRetract { - op: OpType::Retract, - e: entmod::EntidOrLookupRefOrTempId::Entid(ref e_), - a: ref a_, - v: ref v_ } => { + // And evolve them forward. + while generation.can_evolve() { + // Evolve further. + let temp_id_map = self.resolve_temp_id_avs(conn, &generation.temp_id_avs()[..])?; + generation = generation.evolve_one_step(&temp_id_map); + } + + // Allocate entids for tempids that didn't upsert. BTreeSet rather than HashSet so this is deterministic. + let unresolved_temp_ids: BTreeSet = generation.temp_ids_in_allocations(); + + // TODO: track partitions for temporary IDs. + let entids = self.allocate_entids(":db.part/user".to_string(), unresolved_temp_ids.len()); - let e: i64 = match e_ { - &entmod::Entid::Entid(ref e__) => *e__, - &entmod::Entid::Ident(ref e__) => self.schema.require_entid(&e__.to_string())?, - }; + let temp_id_allocations: TempIdMap = unresolved_temp_ids.into_iter().zip(entids).collect(); - let a: i64 = match a_ { - &entmod::Entid::Entid(ref a__) => *a__, - &entmod::Entid::Ident(ref a__) => self.schema.require_entid(&a__.to_string())?, - }; + let final_populations = generation.into_final_populations(&temp_id_allocations)?; + let final_terms: Vec = [final_populations.resolved, + final_populations.allocated, + inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat(); + // Pipeline stage 4: final terms (after rewriting) -> DB insertions. + // Collect into non_fts_*. + // TODO: use something like Clojure's group_by to do this. + for term in final_terms { + match term { + Term::AddOrRetract(op, e, a, v) => { let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?; if attribute.fulltext { - bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented: {:?}", entity))) + bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented"))) // TODO: reference original input. Difficult! } - // This is our chance to do schema-aware typechecking: to either assert that the - // given value is in the attribute's value set, or (in limited cases) to coerce - // the value into the attribute's value set. - let typed_value: TypedValue = self.to_typed_value(v_, &attribute)?; - - let added = false; - + let added = op == OpType::Add; if attribute.multival { - non_fts_many.push((e, a, typed_value, added)); + non_fts_many.push((e, a, v, added)); } else { - non_fts_one.push((e, a, typed_value, added)); + non_fts_one.push((e, a, v, added)); } - Ok(()) }, - - _ => bail!(ErrorKind::NotYetImplemented(format!("Transacting this entity is not yet implemented: {:?}", entity))) } - }).collect(); - - let r: Result> = r.into_iter().collect(); - r?; + } self.create_temp_tables(conn)?; @@ -1022,7 +1117,7 @@ mod tests { let entities: Vec<_> = mentat_tx_parser::Tx::parse(&[assertions][..]).unwrap(); - let report = db.transact(&conn, &entities[..]).unwrap(); + let report = db.transact(&conn, entities).unwrap(); assert_eq!(report.tx_id, bootstrap::TX0 + index + 1); if let Some(expected_transaction) = expected_transaction { diff --git a/db/src/internal_types.rs b/db/src/internal_types.rs new file mode 100644 index 000000000..f71677f8c --- /dev/null +++ b/db/src/internal_types.rs @@ -0,0 +1,85 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +//! Types used only within the transactor. These should not be exposed outside of this crate. + +use std; +use std::collections::HashMap; +use std::rc::Rc; + +use errors; +use errors::ErrorKind; +use types::*; +use mentat_tx::entities::OpType; + +#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] +pub enum Term { + AddOrRetract(OpType, E, Entid, V), +} + +pub type EntidOr = std::result::Result; +pub type TypedValueOr = std::result::Result; + +pub type TempId = Rc; +pub type TempIdMap = HashMap; + +pub type LookupRef = Rc; + +/// Internal representation of an entid on its way to resolution. We either have the simple case (a +/// numeric entid), a lookup-ref that still needs to be resolved (an atomized [a v] pair), or a temp +/// ID that needs to be upserted or allocated (an atomized tempid). +#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] +pub enum LookupRefOrTempId { + LookupRef(LookupRef), + TempId(TempId) +} + +pub type TermWithTempIdsAndLookupRefs = Term, TypedValueOr>; +pub type TermWithTempIds = Term, TypedValueOr>; +pub type TermWithoutTempIds = Term; +pub type Population = Vec; + +impl TermWithTempIds { + // These have no tempids by definition, and just need to be unwrapped. This operation might + // also be called "lowering" or "level lowering", but the concept of "unwrapping" is common in + // Rust and seems appropriate here. + pub fn unwrap(self) -> TermWithoutTempIds { + match self { + Term::AddOrRetract(op, Ok(n), a, Ok(v)) => Term::AddOrRetract(op, n, a, v), + _ => unreachable!(), + } + } +} + +/// Given an `EntidOr` or a `TypedValueOr`, replace any internal `LookupRef` with the entid from +/// the given map. Fail if any `LookupRef` cannot be replaced. +/// +/// `lift` allows to specify how the entid found is mapped into the output type. (This could +/// also be an `Into` or `From` requirement.) +/// +/// The reason for this awkward expression is that we're parameterizing over the _type constructor_ +/// (`EntidOr` or `TypedValueOr`), which is not trivial to express in Rust. This only works because +/// they're both the same `Result<...>` type with different parameterizations. +pub fn replace_lookup_ref(lookup_map: &AVMap, desired_or: Result, lift: U) -> errors::Result> where U: FnOnce(Entid) -> T { + match desired_or { + Ok(desired) => Ok(Ok(desired)), // N.b., must unwrap here -- the ::Ok types are different! + Err(other) => { + match other { + LookupRefOrTempId::TempId(t) => Ok(Err(t)), + LookupRefOrTempId::LookupRef(av) => lookup_map.get(&*av) + .map(|x| lift(*x)).map(Ok) + // XXX TODO: fix this error kind! + .ok_or_else(|| ErrorKind::UnrecognizedIdent(format!("couldn't lookup [a v]: {:?}", (*av).clone())).into()), + } + } + } +} diff --git a/db/src/lib.rs b/db/src/lib.rs index 099ab33e2..12b18dc5f 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -33,6 +33,8 @@ mod entids; mod errors; mod schema; mod types; +mod internal_types; +mod upsert_resolution; mod values; pub use types::DB; diff --git a/db/src/upsert_resolution.rs b/db/src/upsert_resolution.rs new file mode 100644 index 000000000..5c34b17c8 --- /dev/null +++ b/db/src/upsert_resolution.rs @@ -0,0 +1,265 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +//! This module implements the upsert resolution algorithm described at +//! https://github.com/mozilla/mentat/wiki/Transacting:-upsert-resolution-algorithm. + +use std::collections::BTreeSet; + +use mentat_tx::entities::OpType; +use errors; +use errors::ErrorKind; +use types::{Attribute, AVPair, DB, Entid, TypedValue}; +use internal_types::*; +use schema::SchemaBuilding; + +/// A "Simple upsert" that looks like [:db/add TEMPID a v], where a is :db.unique/identity. +#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] +struct UpsertE(TempId, Entid, TypedValue); + +/// A "Complex upsert" that looks like [:db/add TEMPID a OTHERID], where a is :db.unique/identity +#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] +struct UpsertEV(TempId, Entid, TempId); + +/// A generation collects entities into populations at a single evolutionary step in the upsert +/// resolution evolution process. +/// +/// The upsert resolution process is only concerned with [:db/add ...] entities until the final +/// entid allocations. That's why we separate into special simple and complex upsert types +/// immediately, and then collect the more general term types for final resolution. +#[derive(Clone,Debug,Default,Eq,Hash,Ord,PartialOrd,PartialEq)] +pub struct Generation { + /// "Simple upserts" that look like [:db/add TEMPID a v], where a is :db.unique/identity. + upserts_e: Vec, + + /// "Complex upserts" that look like [:db/add TEMPID a OTHERID], where a is :db.unique/identity + upserts_ev: Vec, + + /// Entities that look like: + /// - [:db/add TEMPID b OTHERID], where b is not :db.unique/identity; + /// - [:db/add TEMPID b v], where b is not :db.unique/identity. + /// - [:db/add e b OTHERID]. + allocations: Vec, + + /// Entities that upserted and no longer reference tempids. These assertions are guaranteed to + /// be in the store. + upserted: Vec, + + /// Entities that resolved due to other upserts and no longer reference tempids. These + /// assertions may or may not be in the store. + resolved: Vec, +} + +#[derive(Clone,Debug,Default,Eq,Hash,Ord,PartialOrd,PartialEq)] +pub struct FinalPopulations { + /// Upserts that upserted. + pub upserted: Vec, + + /// Allocations that resolved due to other upserts. + pub resolved: Vec, + + /// Allocations that required new entid allocations. + pub allocated: Vec, +} + +impl Generation { + /// Split entities into a generation of populations that need to evolve to have their tempids + /// resolved or allocated, and a population of inert entities that do not reference tempids. + pub fn from(terms: I, db: &DB) -> errors::Result<(Generation, Population)> where I: IntoIterator { + let mut generation = Generation::default(); + let mut inert = vec![]; + + let is_unique = |a: &Entid| -> errors::Result { + let attribute: &Attribute = db.schema.require_attribute_for_entid(*a)?; + Ok(attribute.unique_identity) + }; + + for term in terms.into_iter() { + match term { + Term::AddOrRetract(op, Err(e), a, Err(v)) => { + if op == OpType::Add && is_unique(&a)? { + generation.upserts_ev.push(UpsertEV(e, a, v)); + } else { + generation.allocations.push(Term::AddOrRetract(op, Err(e), a, Err(v))); + } + }, + Term::AddOrRetract(op, Err(e), a, Ok(v)) => { + if op == OpType::Add && is_unique(&a)? { + generation.upserts_e.push(UpsertE(e, a, v)); + } else { + generation.allocations.push(Term::AddOrRetract(op, Err(e), a, Ok(v))); + } + }, + Term::AddOrRetract(op, Ok(e), a, Err(v)) => { + generation.allocations.push(Term::AddOrRetract(op, Ok(e), a, Err(v))); + }, + Term::AddOrRetract(op, Ok(e), a, Ok(v)) => { + inert.push(Term::AddOrRetract(op, Ok(e), a, Ok(v))); + }, + } + } + + Ok((generation, inert)) + } + + /// Return true if it's possible to evolve this generation further. + /// + /// There can be complex upserts but no simple upserts to help resolve them. We accept the + /// overhead of having the database try to resolve an empty set of simple upserts, to avoid + /// having to special case complex upserts at entid allocation time. + pub fn can_evolve(&self) -> bool { + !self.upserts_e.is_empty() || !self.upserts_ev.is_empty() + } + + /// Evolve this generation one step further by rewriting the existing :db/add entities using the + /// given temporary IDs. + /// + /// TODO: Considering doing this in place; the function already consumes `self`. + pub fn evolve_one_step(self, temp_id_map: &TempIdMap) -> Generation { + let mut next = Generation::default(); + + for UpsertE(t, a, v) in self.upserts_e { + match temp_id_map.get(&*t) { + Some(&n) => next.upserted.push(Term::AddOrRetract(OpType::Add, n, a, v)), + None => next.allocations.push(Term::AddOrRetract(OpType::Add, Err(t), a, Ok(v))), + } + } + + for UpsertEV(t1, a, t2) in self.upserts_ev { + match (temp_id_map.get(&*t1), temp_id_map.get(&*t2)) { + (Some(&n1), Some(&n2)) => next.resolved.push(Term::AddOrRetract(OpType::Add, n1, a, TypedValue::Ref(n2))), + (None, Some(&n2)) => next.upserts_e.push(UpsertE(t1, a, TypedValue::Ref(n2))), + (Some(&n1), None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Ok(n1), a, Err(t2))), + (None, None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Err(t1), a, Err(t2))), + } + } + + // There's no particular need to separate resolved from allocations right here and right + // now, although it is convenient. + for term in self.allocations { + // TODO: find an expression that destructures less? I still expect this to be efficient + // but it's a little verbose. + match term { + Term::AddOrRetract(op, Err(t1), a, Err(t2)) => { + match (temp_id_map.get(&*t1), temp_id_map.get(&*t2)) { + (Some(&n1), Some(&n2)) => next.resolved.push(Term::AddOrRetract(op, n1, a, TypedValue::Ref(n2))), + (None, Some(&n2)) => next.allocations.push(Term::AddOrRetract(op, Err(t1), a, Ok(TypedValue::Ref(n2)))), + (Some(&n1), None) => next.allocations.push(Term::AddOrRetract(op, Ok(n1), a, Err(t2))), + (None, None) => next.allocations.push(Term::AddOrRetract(op, Err(t1), a, Err(t2))), + } + }, + Term::AddOrRetract(op, Err(t), a, Ok(v)) => { + match temp_id_map.get(&*t) { + Some(&n) => next.resolved.push(Term::AddOrRetract(op, n, a, v)), + None => next.allocations.push(Term::AddOrRetract(op, Err(t), a, Ok(v))), + } + }, + Term::AddOrRetract(op, Ok(e), a, Err(t)) => { + match temp_id_map.get(&*t) { + Some(&n) => next.resolved.push(Term::AddOrRetract(op, e, a, TypedValue::Ref(n))), + None => next.allocations.push(Term::AddOrRetract(op, Ok(e), a, Err(t))), + } + }, + Term::AddOrRetract(_, Ok(_), _, Ok(_)) => unreachable!(), + } + } + + next + } + + // Collect id->[a v] pairs that might upsert at this evolutionary step. + pub fn temp_id_avs<'a>(&'a self) -> Vec<(TempId, AVPair)> { + let mut temp_id_avs: Vec<(TempId, AVPair)> = vec![]; + // TODO: map/collect. + for &UpsertE(ref t, ref a, ref v) in &self.upserts_e { + // TODO: figure out how to make this less expensive, i.e., don't require + // clone() of an arbitrary value. + temp_id_avs.push((t.clone(), (*a, v.clone()))); + } + temp_id_avs + } + + /// After evolution is complete, yield the set of tempids that require entid allocation. These + /// are the tempids that appeared in [:db/add ...] entities, but that didn't upsert to existing + /// entids. + pub fn temp_ids_in_allocations(&self) -> BTreeSet { + assert!(self.upserts_e.is_empty(), "All upserts should have been upserted, resolved, or moved to the allocated population!"); + assert!(self.upserts_ev.is_empty(), "All upserts should have been upserted, resolved, or moved to the allocated population!"); + + let mut temp_ids: BTreeSet = BTreeSet::default(); + + for term in self.allocations.iter() { + match term { + &Term::AddOrRetract(OpType::Add, Err(ref t1), _, Err(ref t2)) => { + temp_ids.insert(t1.clone()); + temp_ids.insert(t2.clone()); + }, + &Term::AddOrRetract(OpType::Add, Err(ref t), _, Ok(_)) => { + temp_ids.insert(t.clone()); + }, + &Term::AddOrRetract(OpType::Add, Ok(_), _, Err(ref t)) => { + temp_ids.insert(t.clone()); + }, + &Term::AddOrRetract(OpType::Add, Ok(_), _, Ok(_)) => unreachable!(), + &Term::AddOrRetract(OpType::Retract, _, _, _) => { + // [:db/retract ...] entities never allocate entids; they have to resolve due to + // other upserts (or they fail the transaction). + }, + } + } + + temp_ids + } + + /// After evolution is complete, use the provided allocated entids to segment `self` into + /// populations, each with no references to tempids. + pub fn into_final_populations(self, temp_id_map: &TempIdMap) -> errors::Result { + assert!(self.upserts_e.is_empty()); + assert!(self.upserts_ev.is_empty()); + + let mut populations = FinalPopulations::default(); + + populations.upserted = self.upserted; + populations.resolved = self.resolved; + + for term in self.allocations { + let allocated = match term { + // TODO: consider require implementing require on temp_id_map. + Term::AddOrRetract(op, Err(t1), a, Err(t2)) => { + match (op, temp_id_map.get(&*t1), temp_id_map.get(&*t2)) { + (op, Some(&n1), Some(&n2)) => Term::AddOrRetract(op, n1, a, TypedValue::Ref(n2)), + (OpType::Add, _, _) => unreachable!(), // This is a coding error -- every tempid in a :db/add entity should resolve or be allocated. + (OpType::Retract, _, _) => bail!(ErrorKind::NotYetImplemented(format!("[:db/retract ...] entity referenced tempid that did not upsert: one of {}, {}", t1, t2))), + } + }, + Term::AddOrRetract(op, Err(t), a, Ok(v)) => { + match (op, temp_id_map.get(&*t)) { + (op, Some(&n)) => Term::AddOrRetract(op, n, a, v), + (OpType::Add, _) => unreachable!(), // This is a coding error. + (OpType::Retract, _) => bail!(ErrorKind::NotYetImplemented(format!("[:db/retract ...] entity referenced tempid that did not upsert: {}", t))), + } + }, + Term::AddOrRetract(op, Ok(e), a, Err(t)) => { + match (op, temp_id_map.get(&*t)) { + (op, Some(&n)) => Term::AddOrRetract(op, e, a, TypedValue::Ref(n)), + (OpType::Add, _) => unreachable!(), // This is a coding error. + (OpType::Retract, _) => bail!(ErrorKind::NotYetImplemented(format!("[:db/retract ...] entity referenced tempid that did not upsert: {}", t))), + } + }, + Term::AddOrRetract(_, Ok(_), _, Ok(_)) => unreachable!(), // This is a coding error -- these should not be in allocations. + }; + populations.allocated.push(allocated); + } + + Ok(populations) + } +} From 96fd119426c3597ff8dbdcae34eb424008cf2288 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 8 Feb 2017 17:17:27 -0800 Subject: [PATCH 08/13] Post: Test upserting with vectors. This converts an existing test to EDN: https://github.com/mozilla/mentat/blob/84a80f40f5c888f8452d07bd15f3b5fba49d3963/test/datomish/db_test.cljc#L193. --- db/src/db.rs | 51 ++++++++++-- tx/fixtures/test_upsert_vector.edn | 121 +++++++++++++++++++++++++++++ 2 files changed, 164 insertions(+), 8 deletions(-) create mode 100644 tx/fixtures/test_upsert_vector.edn diff --git a/db/src/db.rs b/db/src/db.rs index d769de90b..95c550ecc 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -1114,25 +1114,40 @@ mod tests { let assertions: edn::Value = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "assertions"))).unwrap().clone(); let expected_transaction: Option<&edn::Value> = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "expected-transaction"))); let expected_datoms: Option<&edn::Value> = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "expected-datoms"))); + let expected_error_message: Option<&edn::Value> = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "expected-error-message"))); let entities: Vec<_> = mentat_tx_parser::Tx::parse(&[assertions][..]).unwrap(); - let report = db.transact(&conn, entities).unwrap(); - assert_eq!(report.tx_id, bootstrap::TX0 + index + 1); + let maybe_report = db.transact(&conn, entities); if let Some(expected_transaction) = expected_transaction { + if expected_transaction.is_nil() { + assert!(maybe_report.is_err()); + + if let Some(expected_error_message) = expected_error_message { + let expected_error_message = expected_error_message.as_text(); + assert!(expected_error_message.is_some(), "Expected error message to be text:\n{:?}", expected_error_message); + let error_message = maybe_report.unwrap_err().to_string(); + assert!(error_message.contains(expected_error_message.unwrap()), "Expected error message:\n{}\nto contain:\n{}", error_message, expected_error_message.unwrap()); + } + continue + } + + let report = maybe_report.unwrap(); + assert_eq!(report.tx_id, bootstrap::TX0 + index + 1); + let transactions = debug::transactions_after(&conn, &db, bootstrap::TX0 + index).unwrap(); assert_eq!(transactions.0.len(), 1); - assert_eq!(transactions.0[0].into_edn(), - *expected_transaction, - "\n{} - expected transaction:\n{}\n{}", label, transactions.0[0].into_edn(), *expected_transaction); + assert_eq!(*expected_transaction, + transactions.0[0].into_edn(), + "\n{} - expected transaction:\n{}\nbut got transaction:\n{}", label, *expected_transaction, transactions.0[0].into_edn()); } if let Some(expected_datoms) = expected_datoms { let datoms = debug::datoms_after(&conn, &db, bootstrap::TX0).unwrap(); - assert_eq!(datoms.into_edn(), - *expected_datoms, - "\n{} - expected datoms:\n{}\n{}", label, datoms.into_edn(), *expected_datoms); + assert_eq!(*expected_datoms, + datoms.into_edn(), + "\n{} - expected datoms:\n{}\nbut got datoms:\n{}", label, *expected_datoms, datoms.into_edn()) } // Don't allow empty tests. This will need to change if we allow transacting schema @@ -1182,4 +1197,24 @@ mod tests { let transactions = value.as_vector().unwrap(); assert_transactions(&conn, &mut db, transactions); } + + #[test] + fn test_upsert_vector() { + let mut conn = new_connection("").expect("Couldn't open in-memory db"); + let mut db = ensure_current_version(&mut conn).unwrap(); + + // Does not include :db/txInstant. + let datoms = debug::datoms_after(&conn, &db, 0).unwrap(); + assert_eq!(datoms.0.len(), 88); + + // Includes :db/txInstant. + let transactions = debug::transactions_after(&conn, &db, 0).unwrap(); + assert_eq!(transactions.0.len(), 1); + assert_eq!(transactions.0[0].0.len(), 89); + + let value = edn::parse::value(include_str!("../../tx/fixtures/test_upsert_vector.edn")).unwrap(); + + let transactions = value.as_vector().unwrap(); + assert_transactions(&conn, &mut db, transactions); + } } diff --git a/tx/fixtures/test_upsert_vector.edn b/tx/fixtures/test_upsert_vector.edn new file mode 100644 index 000000000..41a55c730 --- /dev/null +++ b/tx/fixtures/test_upsert_vector.edn @@ -0,0 +1,121 @@ +[{:test/label ":db.cardinality/one, insert" + :test/assertions + [[:db/add 100 :db/ident :name/Ivan] + [:db/add 101 :db/ident :name/Petr]] + :test/expected-transaction + #{[100 :db/ident :name/Ivan ?tx1 true] + [101 :db/ident :name/Petr ?tx1 true] + [?tx1 :db/txInstant ?ms1 ?tx1 true]} + :test/expected-datoms + #{[100 :db/ident :name/Ivan] + [101 :db/ident :name/Petr]}} + + {:test/label "upsert two tempids to same entid" + :test/assertions + [[:db/add "t1" :db/ident :name/Ivan] + [:db/add "t1" :db.schema/attribute 100] + [:db/add "t2" :db/ident :name/Petr] + [:db/add "t2" :db.schema/attribute 101]] + :test/expected-transaction + #{[100 :db.schema/attribute 100 ?tx2 true] + [101 :db.schema/attribute 101 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]} + :test/expected-datoms + #{[100 :db/ident :name/Ivan] + [101 :db/ident :name/Petr] + [100 :db.schema/attribute 100] + [101 :db.schema/attribute 101]} + :test/expected-tempids + {"t1" 100 + "t2" 101}} + + {:test/label "upsert with tempid" + :test/assertions + [[:db/add "t1" :db/ident :name/Ivan] + ;; Ref doesn't have to exist (at this time). Can't reuse due to :db/unique :db.unique/value. + [:db/add "t1" :db.schema/attribute 102]] + :test/expected-transaction + #{[100 :db.schema/attribute 102 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]} + :test/expected-datoms + #{[100 :db/ident :name/Ivan] + [101 :db/ident :name/Petr] + [100 :db.schema/attribute 100] + [100 :db.schema/attribute 102] + [101 :db.schema/attribute 101]} + :test/expected-tempids + {"t1" 100}} + + ;; TODO: don't hard-code allocated entids. + {:test/label "single complex upsert allocates new entid" + :test/assertions + [[:db/add "t1" :db.schema/attribute "t2"]] + :test/expected-transaction + #{[65536 :db.schema/attribute 65537 ?tx4 true] + [?tx4 :db/txInstant ?ms4 ?tx4 true]} + :test/expected-tempids + {"t1" 65536 + "t2" 65537}} + + {:test/label "conflicting upserts fail" + :test/assertions + [[:db/add "t1" :db/ident :name/Ivan] + [:db/add "t1" :db/ident :name/Petr]] + :test/expected-transaction + nil + :test/expected-error-message + "Conflicting upsert" + ;; nil + } + + {:test/label "tempids in :db/retract that do upsert are fine" + :test/assertions + [[:db/add "t1" :db/ident :name/Ivan] + ;; This ref doesn't exist, so the assertion will be ignored. + [:db/retract "t1" :db.schema/attribute 103]] + :test/expected-transaction + #{[?tx6 :db/txInstant ?ms6 ?tx6 true]} + :test/expected-error-message + "" + :test/expected-tempids + {}} + + {:test/label "tempids in :db/retract that don't upsert fail" + :test/assertions + [[:db/retract "t1" :db/ident :name/Anonymous]] + :test/expected-transaction + nil + :test/expected-error-message + ""} + + ;; The upsert algorithm will first try to resolve "t1", fail, and then allocate both "t1" and "t2". + {:test/label "multistep, both allocated" + :test/assertions + [[:db/add "t1" :db/ident :name/Josef] + [:db/add "t2" :db.schema/attribute "t1"]] + :test/expected-transaction + #{[65538 :db/ident :name/Josef ?tx8 true] + [65539 :db.schema/attribute 65538 ?tx8 true] + [?tx8 :db/txInstant ?ms8 ?tx8 true]} + :test/expected-error-message + "" + :test/expected-tempids + {"t1" 65538 + "t2" 65539}} + + ;; Can't quite test this without more schema elements. + ;; ;; This time, we can resolve both, but we have to try "t1", succeed, and then resolve "t2". + ;; {:test/label "multistep, upserted allocated" + ;; :test/assertions + ;; [[:db/add "t1" :db/ident :name/Josef] + ;; [:db/add "t2" :db/ident "t1"]] + ;; :test/expected-transaction + ;; #{[65538 :db/ident :name/Josef] + ;; [65538 :db/ident :name/Karl] + ;; [?tx8 :db/txInstant ?ms8 ?tx8 true]} + ;; :test/expected-error-message + ;; "" + ;; :test/expected-tempids + ;; {"t1" 65538 + ;; "t2" 65539}} + ] From 32c9f312a1e768bd39441fafe50cddcf810c2664 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Sat, 11 Feb 2017 12:06:43 -0800 Subject: [PATCH 09/13] Post: Separate Tx out of DB. This is very preliminary, since we don't have a real connection type to manage transactions and their metadata yet. --- db/src/db.rs | 254 +++------------------------------- db/src/lib.rs | 1 + db/src/tx.rs | 264 ++++++++++++++++++++++++++++++++++++ db/src/upsert_resolution.rs | 12 +- 4 files changed, 288 insertions(+), 243 deletions(-) create mode 100644 db/src/tx.rs diff --git a/db/src/db.rs b/db/src/db.rs index 95c550ecc..e2435b119 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -10,8 +10,7 @@ #![allow(dead_code)] -use std; -use std::collections::{BTreeSet, HashMap}; +use std::collections::HashMap; use std::iter::{once, repeat}; use std::ops::Range; use std::path::Path; @@ -24,7 +23,6 @@ use rusqlite::types::{ToSql, ToSqlOutput}; use ::{now, repeat_values, to_namespaced_keyword}; use bootstrap; use edn::types::Value; -use entids; use mentat_core::{ Attribute, AttributeBitFlags, @@ -34,9 +32,7 @@ use mentat_core::{ TypedValue, ValueType, }; -use mentat_core::intern_set; -use mentat_tx::entities as entmod; -use mentat_tx::entities::{Entity, OpType}; +use mentat_tx::entities::Entity; use errors::{ErrorKind, Result, ResultExt}; use schema::SchemaBuilding; use types::{ @@ -47,17 +43,7 @@ use types::{ PartitionMap, TxReport, }; -use internal_types::{ - replace_lookup_ref, - LookupRefOrTempId, - TempId, - TempIdMap, - Term, - TermWithTempIds, - TermWithTempIdsAndLookupRefs, - TermWithoutTempIds, -}; -use upsert_resolution::Generation; +use tx::Tx; pub fn new_connection(uri: T) -> rusqlite::Result where T: AsRef { let conn = match uri.as_ref().to_string_lossy().len() { @@ -452,7 +438,7 @@ pub fn read_db(conn: &rusqlite::Connection) -> Result { } /// Internal representation of an [e a v added] datom, ready to be transacted against the store. -type ReducedEntity = (i64, i64, TypedValue, bool); +pub type ReducedEntity = (i64, i64, TypedValue, bool); #[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] pub enum SearchType { @@ -557,38 +543,6 @@ impl DB { Ok(m) } - /// TODO: explain what this is doing. - pub fn resolve_temp_id_avs<'a>(&self, conn: &rusqlite::Connection, temp_id_avs: &'a [(TempId, AVPair)]) -> Result { - if temp_id_avs.is_empty() { - return Ok(TempIdMap::default()); - } - - // Map [a v]->entid. - let mut av_pairs: Vec<&AVPair> = vec![]; - for i in 0..temp_id_avs.len() { - av_pairs.push(&temp_id_avs[i].1); - } - - // Lookup in the store. - let av_map: AVMap = self.resolve_avs(conn, &av_pairs[..])?; - - // Map id->entid. - let mut temp_id_map: TempIdMap = TempIdMap::default(); - for &(ref temp_id, ref av_pair) in temp_id_avs { - if let Some(n) = av_map.get(&av_pair) { - if let Some(previous_n) = temp_id_map.get(&*temp_id) { - if n != previous_n { - // Conflicting upsert! TODO: collect conflicts and give more details on what failed this transaction. - bail!(ErrorKind::NotYetImplemented(format!("Conflicting upsert: tempid '{}' resolves to more than one entid: {:?}, {:?}", temp_id, previous_n, n))) // XXX - } - } - temp_id_map.insert(temp_id.clone(), *n); - } - } - - Ok((temp_id_map)) - } - /// Create empty temporary tables for search parameters and search results. fn create_temp_tables(&self, conn: &rusqlite::Connection) -> Result<()> { // We can't do this in one shot, since we can't prepare a batch statement. @@ -654,7 +608,7 @@ impl DB { /// /// Eventually, the details of this approach will be captured in /// https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. - fn insert_non_fts_searches<'a>(&self, conn: &rusqlite::Connection, entities: &'a [ReducedEntity], tx: Entid, search_type: SearchType) -> Result<()> { + pub fn insert_non_fts_searches<'a>(&self, conn: &rusqlite::Connection, entities: &'a [ReducedEntity], tx: Entid, search_type: SearchType) -> Result<()> { let bindings_per_statement = 7; let chunks: itertools::IntoChunks<_> = entities.into_iter().chunks(::SQLITE_MAX_VARIABLE_NUMBER / bindings_per_statement); @@ -718,7 +672,7 @@ impl DB { /// Take search rows and complete `temp.search_results`. /// /// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. - fn search(&self, conn: &rusqlite::Connection) -> Result<()> { + pub fn search(&self, conn: &rusqlite::Connection) -> Result<()> { // First is fast, only one table walk: lookup by exact eav. // Second is slower, but still only one table walk: lookup old value by ea. let s = r#" @@ -751,7 +705,7 @@ impl DB { /// /// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. // TODO: capture `conn` in a `TxInternal` structure. - fn insert_transaction(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> { + pub fn insert_transaction(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> { let s = r#" INSERT INTO transactions (e, a, v, tx, added, value_type_tag) SELECT e0, a0, v0, ?, 1, value_type_tag0 @@ -785,7 +739,7 @@ impl DB { /// /// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. // TODO: capture `conn` in a `TxInternal` structure. - fn update_datoms(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> { + pub fn update_datoms(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> { // Delete datoms that were retracted, or those that were :db.cardinality/one and will be // replaced. let s = r#" @@ -827,7 +781,7 @@ impl DB { /// Update the current partition map materialized view. // TODO: only update changed partitions. - fn update_partition_map(&self, conn: &rusqlite::Connection) -> Result<()> { + pub fn update_partition_map(&self, conn: &rusqlite::Connection) -> Result<()> { let values_per_statement = 2; let max_partitions = ::SQLITE_MAX_VARIABLE_NUMBER / values_per_statement; if self.partition_map.len() > max_partitions { @@ -853,12 +807,12 @@ impl DB { } /// Allocate a single fresh entid in the given `partition`. - fn allocate_entid(&mut self, partition: String) -> i64 { + pub fn allocate_entid(&mut self, partition: String) -> i64 { self.allocate_entids(partition, 1).start } /// Allocate `n` fresh entids in the given `partition`. - fn allocate_entids(&mut self, partition: String, n: usize) -> Range { + pub fn allocate_entids(&mut self, partition: String, n: usize) -> Range { match self.partition_map.get_mut(&partition) { Some(mut partition) => { let idx = partition.index; @@ -870,196 +824,22 @@ impl DB { } } - /// Pipeline stage 1: convert `Entity` instances into `Term` instances, ready for term - /// rewriting. - /// - /// The `Term` instances produce share interned TempId and LookupRef handles. - fn entities_into_terms_with_temp_ids_and_lookup_refs(&self, entities: I) -> Result> where I: IntoIterator { - let mut temp_ids = intern_set::InternSet::new(); - - entities.into_iter() - .map(|entity: Entity| -> Result { - match entity { - Entity::AddOrRetract { op, e, a, v } => { - let a: i64 = match a { - entmod::Entid::Entid(ref a) => *a, - entmod::Entid::Ident(ref a) => self.schema.require_entid(&a.to_string())?, - }; - - let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?; - - let e = match e { - entmod::EntidOrLookupRefOrTempId::Entid(e) => { - let e: i64 = match e { - entmod::Entid::Entid(ref e) => *e, - entmod::Entid::Ident(ref e) => self.schema.require_entid(&e.to_string())?, - }; - std::result::Result::Ok(e) - }, - - entmod::EntidOrLookupRefOrTempId::TempId(e) => { - std::result::Result::Err(LookupRefOrTempId::TempId(temp_ids.intern(e))) - }, - - entmod::EntidOrLookupRefOrTempId::LookupRef(_) => { - // TODO: reference entity and initial input. - bail!(ErrorKind::NotYetImplemented(format!("Transacting lookup-refs is not yet implemented"))) - }, - }; - - let v = { - if attribute.value_type == ValueType::Ref && v.is_text() { - std::result::Result::Err(LookupRefOrTempId::TempId(temp_ids.intern(v.as_text().unwrap().clone()))) - } else if attribute.value_type == ValueType::Ref && v.is_vector() && v.as_vector().unwrap().len() == 2 { - bail!(ErrorKind::NotYetImplemented(format!("Transacting lookup-refs is not yet implemented"))) - } else { - // Here is where we do schema-aware typechecking: we either assert that - // the given value is in the attribute's value set, or (in limited - // cases) coerce the value into the attribute's value set. - let typed_value: TypedValue = self.to_typed_value(&v, &attribute)?; - - std::result::Result::Ok(typed_value) - } - }; - - Ok(Term::AddOrRetract(op, e, a, v)) - }, - } - }) - .collect::>>() - } - - /// Pipeline stage 2: rewrite `Term` instances with lookup refs into `Term` instances without - /// lookup refs. - /// - /// The `Term` instances produce share interned TempId handles and have no LookupRef references. - fn resolve_lookup_refs(&self, lookup_ref_map: &AVMap, terms: I) -> Result> where I: IntoIterator { - terms.into_iter().map(|term: TermWithTempIdsAndLookupRefs| -> Result { - match term { - Term::AddOrRetract(op, e, a, v) => { - let e = replace_lookup_ref(&lookup_ref_map, e, |x| x)?; - let v = replace_lookup_ref(&lookup_ref_map, v, |x| TypedValue::Ref(x))?; - Ok(Term::AddOrRetract(op, e, a, v)) - }, - } - }).collect::>>() - } - /// Transact the given `entities` against the given SQLite `conn`, using the metadata in /// `self.DB`. /// /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. // TODO: move this to the transactor layer. pub fn transact(&mut self, conn: &rusqlite::Connection, entities: I) -> Result where I: IntoIterator { - let tx_instant = now(); // Label the transaction with the timestamp when we first see it: leading edge. - let tx = self.allocate_entid(":db.part/tx".to_string()); - - // Eventually, this will be responsible for managing a SQLite transaction. For now, it's - // just about the tx details. - self.transact_internal(conn, entities, tx, tx_instant) - } - - /// Transact the given `entities` against the given SQLite `conn`, using the metadata in - /// `self.DB`. - /// - /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. - // TODO: move this to the transactor layer. - pub fn transact_internal(&mut self, conn: &rusqlite::Connection, entities: I, tx_id: Entid, tx_instant: i64) -> Result where I: IntoIterator { - // TODO: push these into an internal transaction report? - - /// Assertions that are :db.cardinality/one and not :db.fulltext. - let mut non_fts_one: Vec = vec![]; - - /// Assertions that are :db.cardinality/many and not :db.fulltext. - let mut non_fts_many: Vec = vec![]; - - // Transact [:db/add :db/txInstant NOW :db/tx]. - // TODO: allow this to be present in the transaction data. - non_fts_one.push((tx_id, - entids::DB_TX_INSTANT, - TypedValue::Long(tx_instant), - true)); - - // We don't yet support lookup refs, so this isn't mutable. Later, it'll be mutable. - let lookup_refs: intern_set::InternSet = intern_set::InternSet::new(); - - // TODO: extract the tempids set as well. - // Pipeline stage 1: entities -> terms with tempids and lookup refs. - let terms_with_temp_ids_and_lookup_refs = self.entities_into_terms_with_temp_ids_and_lookup_refs(entities)?; - - // Pipeline stage 2: resolve lookup refs -> terms with tempids. - let lookup_ref_avs: Vec<&(i64, TypedValue)> = lookup_refs.inner.iter().map(|rc| &**rc).collect(); - let lookup_ref_map: AVMap = self.resolve_avs(conn, &lookup_ref_avs[..])?; - - let terms_with_temp_ids = self.resolve_lookup_refs(&lookup_ref_map, terms_with_temp_ids_and_lookup_refs)?; - - // Pipeline stage 3: upsert tempids -> terms without tempids or lookup refs. - // Now we can collect upsert populations. - let (mut generation, inert_terms) = Generation::from(terms_with_temp_ids, self)?; - - // And evolve them forward. - while generation.can_evolve() { - // Evolve further. - let temp_id_map = self.resolve_temp_id_avs(conn, &generation.temp_id_avs()[..])?; - generation = generation.evolve_one_step(&temp_id_map); - } - - // Allocate entids for tempids that didn't upsert. BTreeSet rather than HashSet so this is deterministic. - let unresolved_temp_ids: BTreeSet = generation.temp_ids_in_allocations(); - - // TODO: track partitions for temporary IDs. - let entids = self.allocate_entids(":db.part/user".to_string(), unresolved_temp_ids.len()); - - let temp_id_allocations: TempIdMap = unresolved_temp_ids.into_iter().zip(entids).collect(); + // Eventually, this function will be responsible for managing a SQLite transaction. For + // now, it's just about the tx details. - let final_populations = generation.into_final_populations(&temp_id_allocations)?; - let final_terms: Vec = [final_populations.resolved, - final_populations.allocated, - inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat(); - - // Pipeline stage 4: final terms (after rewriting) -> DB insertions. - // Collect into non_fts_*. - // TODO: use something like Clojure's group_by to do this. - for term in final_terms { - match term { - Term::AddOrRetract(op, e, a, v) => { - let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?; - if attribute.fulltext { - bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented"))) // TODO: reference original input. Difficult! - } - - let added = op == OpType::Add; - if attribute.multival { - non_fts_many.push((e, a, v, added)); - } else { - non_fts_one.push((e, a, v, added)); - } - }, - } - } + let tx_instant = now(); // Label the transaction with the timestamp when we first see it: leading edge. + let tx_id = self.allocate_entid(":db.part/tx".to_string()); self.create_temp_tables(conn)?; - if !non_fts_one.is_empty() { - self.insert_non_fts_searches(conn, &non_fts_one[..], tx_id, SearchType::Inexact)?; - } - - if !non_fts_many.is_empty() { - self.insert_non_fts_searches(conn, &non_fts_many[..], tx_id, SearchType::Exact)?; - } - - self.search(conn)?; - - self.insert_transaction(conn, tx_id)?; - self.update_datoms(conn, tx_id)?; - - // TODO: update idents and schema materialized views. - self.update_partition_map(conn)?; - - Ok(TxReport { - tx_id: tx_id, - tx_instant: tx_instant, - }) + let mut tx = Tx::new(self, conn, tx_id, tx_instant); + tx.transact_entities(entities) } } diff --git a/db/src/lib.rs b/db/src/lib.rs index 12b18dc5f..fd8dca7ac 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -36,6 +36,7 @@ mod types; mod internal_types; mod upsert_resolution; mod values; +mod tx; pub use types::DB; diff --git a/db/src/tx.rs b/db/src/tx.rs new file mode 100644 index 000000000..f9e9bedd6 --- /dev/null +++ b/db/src/tx.rs @@ -0,0 +1,264 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +use std; +use std::collections::BTreeSet; + +use db::{ReducedEntity, SearchType}; +use entids; +use errors::*; +use internal_types::{LookupRefOrTempId, TempId, TempIdMap, Term, TermWithTempIdsAndLookupRefs, TermWithTempIds, TermWithoutTempIds, replace_lookup_ref}; +use mentat_core::intern_set; +use mentat_tx::entities as entmod; +use mentat_tx::entities::{Entity, OpType}; +use rusqlite; +use schema::SchemaBuilding; +use types::*; +use upsert_resolution::Generation; + +/// A transaction on its way to being applied. +#[derive(Debug)] +pub struct Tx<'conn> { + /// The metadata to use to interpret the transaction entities with. + pub db: &'conn mut DB, + + /// The SQLite connection to apply against. In the future, this will be a Mentat connection. + pub conn: &'conn rusqlite::Connection, + + /// The transaction ID of the transaction. + pub tx_id: Entid, + + /// The timestamp when the transaction began to be commited. + /// + /// This is milliseconds after the Unix epoch according to the transactor's local clock. + // TODO: :db.type/instant. + pub tx_instant: i64, +} + +impl<'conn> Tx<'conn> { + pub fn new(db: &'conn mut DB, conn: &'conn rusqlite::Connection, tx_id: Entid, tx_instant: i64) -> Tx<'conn> { + Tx { db: db, + conn: conn, + tx_id: tx_id, + tx_instant: tx_instant } + } + + /// Given a collection of tempids and the [a v] pairs that they might upsert to, resolve exactly + /// which [a v] pairs do upsert to entids, and map each tempid that upserts to the upserted + /// entid. The keys of the resulting map are exactly those tempids that upserted. + pub fn resolve_temp_id_avs<'a>(&self, conn: &rusqlite::Connection, temp_id_avs: &'a [(TempId, AVPair)]) -> Result { + if temp_id_avs.is_empty() { + return Ok(TempIdMap::default()); + } + + // Map [a v]->entid. + let mut av_pairs: Vec<&AVPair> = vec![]; + for i in 0..temp_id_avs.len() { + av_pairs.push(&temp_id_avs[i].1); + } + + // Lookup in the store. + let av_map: AVMap = self.db.resolve_avs(conn, &av_pairs[..])?; + + // Map id->entid. + let mut temp_id_map: TempIdMap = TempIdMap::default(); + for &(ref temp_id, ref av_pair) in temp_id_avs { + if let Some(n) = av_map.get(&av_pair) { + if let Some(previous_n) = temp_id_map.get(&*temp_id) { + if n != previous_n { + // Conflicting upsert! TODO: collect conflicts and give more details on what failed this transaction. + bail!(ErrorKind::NotYetImplemented(format!("Conflicting upsert: tempid '{}' resolves to more than one entid: {:?}, {:?}", temp_id, previous_n, n))) // XXX + } + } + temp_id_map.insert(temp_id.clone(), *n); + } + } + + Ok((temp_id_map)) + } + + /// Pipeline stage 1: convert `Entity` instances into `Term` instances, ready for term + /// rewriting. + /// + /// The `Term` instances produce share interned TempId and LookupRef handles. + fn entities_into_terms_with_temp_ids_and_lookup_refs(&self, entities: I) -> Result> where I: IntoIterator { + let mut temp_ids = intern_set::InternSet::new(); + + entities.into_iter() + .map(|entity: Entity| -> Result { + match entity { + Entity::AddOrRetract { op, e, a, v } => { + let a: i64 = match a { + entmod::Entid::Entid(ref a) => *a, + entmod::Entid::Ident(ref a) => self.db.schema.require_entid(&a.to_string())?, + }; + + let attribute: &Attribute = self.db.schema.require_attribute_for_entid(a)?; + + let e = match e { + entmod::EntidOrLookupRefOrTempId::Entid(e) => { + let e: i64 = match e { + entmod::Entid::Entid(ref e) => *e, + entmod::Entid::Ident(ref e) => self.db.schema.require_entid(&e.to_string())?, + }; + std::result::Result::Ok(e) + }, + + entmod::EntidOrLookupRefOrTempId::TempId(e) => { + std::result::Result::Err(LookupRefOrTempId::TempId(temp_ids.intern(e))) + }, + + entmod::EntidOrLookupRefOrTempId::LookupRef(_) => { + // TODO: reference entity and initial input. + bail!(ErrorKind::NotYetImplemented(format!("Transacting lookup-refs is not yet implemented"))) + }, + }; + + let v = { + if attribute.value_type == ValueType::Ref && v.is_text() { + std::result::Result::Err(LookupRefOrTempId::TempId(temp_ids.intern(v.as_text().unwrap().clone()))) + } else if attribute.value_type == ValueType::Ref && v.is_vector() && v.as_vector().unwrap().len() == 2 { + bail!(ErrorKind::NotYetImplemented(format!("Transacting lookup-refs is not yet implemented"))) + } else { + // Here is where we do schema-aware typechecking: we either assert that + // the given value is in the attribute's value set, or (in limited + // cases) coerce the value into the attribute's value set. + let typed_value: TypedValue = self.db.to_typed_value(&v, &attribute)?; + + std::result::Result::Ok(typed_value) + } + }; + + Ok(Term::AddOrRetract(op, e, a, v)) + }, + } + }) + .collect::>>() + } + + /// Pipeline stage 2: rewrite `Term` instances with lookup refs into `Term` instances without + /// lookup refs. + /// + /// The `Term` instances produce share interned TempId handles and have no LookupRef references. + fn resolve_lookup_refs(&self, lookup_ref_map: &AVMap, terms: I) -> Result> where I: IntoIterator { + terms.into_iter().map(|term: TermWithTempIdsAndLookupRefs| -> Result { + match term { + Term::AddOrRetract(op, e, a, v) => { + let e = replace_lookup_ref(&lookup_ref_map, e, |x| x)?; + let v = replace_lookup_ref(&lookup_ref_map, v, |x| TypedValue::Ref(x))?; + Ok(Term::AddOrRetract(op, e, a, v)) + }, + } + }).collect::>>() + } + + /// Transact the given `entities` against the given SQLite `conn`, using the metadata in + /// `self.DB`. + /// + /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. + // TODO: move this to the transactor layer. + pub fn transact_entities(&mut self, entities: I) -> Result where I: IntoIterator { + // TODO: push these into an internal transaction report? + + /// Assertions that are :db.cardinality/one and not :db.fulltext. + let mut non_fts_one: Vec = vec![]; + + /// Assertions that are :db.cardinality/many and not :db.fulltext. + let mut non_fts_many: Vec = vec![]; + + // Transact [:db/add :db/txInstant NOW :db/tx]. + // TODO: allow this to be present in the transaction data. + non_fts_one.push((self.tx_id, + entids::DB_TX_INSTANT, + TypedValue::Long(self.tx_instant), + true)); + + // We don't yet support lookup refs, so this isn't mutable. Later, it'll be mutable. + let lookup_refs: intern_set::InternSet = intern_set::InternSet::new(); + + // TODO: extract the tempids set as well. + // Pipeline stage 1: entities -> terms with tempids and lookup refs. + let terms_with_temp_ids_and_lookup_refs = self.entities_into_terms_with_temp_ids_and_lookup_refs(entities)?; + + // Pipeline stage 2: resolve lookup refs -> terms with tempids. + let lookup_ref_avs: Vec<&(i64, TypedValue)> = lookup_refs.inner.iter().map(|rc| &**rc).collect(); + let lookup_ref_map: AVMap = self.db.resolve_avs(self.conn, &lookup_ref_avs[..])?; + + let terms_with_temp_ids = self.resolve_lookup_refs(&lookup_ref_map, terms_with_temp_ids_and_lookup_refs)?; + + // Pipeline stage 3: upsert tempids -> terms without tempids or lookup refs. + // Now we can collect upsert populations. + let (mut generation, inert_terms) = Generation::from(terms_with_temp_ids, &self.db.schema)?; + + // And evolve them forward. + while generation.can_evolve() { + // Evolve further. + let temp_id_map = self.resolve_temp_id_avs(self.conn, &generation.temp_id_avs()[..])?; + generation = generation.evolve_one_step(&temp_id_map); + } + + // Allocate entids for tempids that didn't upsert. BTreeSet rather than HashSet so this is deterministic. + let unresolved_temp_ids: BTreeSet = generation.temp_ids_in_allocations(); + + // TODO: track partitions for temporary IDs. + let entids = self.db.allocate_entids(":db.part/user".to_string(), unresolved_temp_ids.len()); + + let temp_id_allocations: TempIdMap = unresolved_temp_ids.into_iter().zip(entids).collect(); + + let final_populations = generation.into_final_populations(&temp_id_allocations)?; + let final_terms: Vec = [final_populations.resolved, + final_populations.allocated, + inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat(); + + // Pipeline stage 4: final terms (after rewriting) -> DB insertions. + // Collect into non_fts_*. + // TODO: use something like Clojure's group_by to do this. + for term in final_terms { + match term { + Term::AddOrRetract(op, e, a, v) => { + let attribute: &Attribute = self.db.schema.require_attribute_for_entid(a)?; + if attribute.fulltext { + bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented"))) // TODO: reference original input. Difficult! + } + + let added = op == OpType::Add; + if attribute.multival { + non_fts_many.push((e, a, v, added)); + } else { + non_fts_one.push((e, a, v, added)); + } + }, + } + } + + if !non_fts_one.is_empty() { + self.db.insert_non_fts_searches(self.conn, &non_fts_one[..], self.tx_id, SearchType::Inexact)?; + } + + if !non_fts_many.is_empty() { + self.db.insert_non_fts_searches(self.conn, &non_fts_many[..], self.tx_id, SearchType::Exact)?; + } + + self.db.search(self.conn)?; + + self.db.insert_transaction(self.conn, self.tx_id)?; + self.db.update_datoms(self.conn, self.tx_id)?; + + // TODO: update idents and schema materialized views. + self.db.update_partition_map(self.conn)?; + + Ok(TxReport { + tx_id: self.tx_id, + tx_instant: self.tx_instant, + }) + } +} diff --git a/db/src/upsert_resolution.rs b/db/src/upsert_resolution.rs index 5c34b17c8..1262821b5 100644 --- a/db/src/upsert_resolution.rs +++ b/db/src/upsert_resolution.rs @@ -18,7 +18,7 @@ use std::collections::BTreeSet; use mentat_tx::entities::OpType; use errors; use errors::ErrorKind; -use types::{Attribute, AVPair, DB, Entid, TypedValue}; +use types::{Attribute, AVPair, Entid, Schema, TypedValue}; use internal_types::*; use schema::SchemaBuilding; @@ -74,26 +74,26 @@ pub struct FinalPopulations { impl Generation { /// Split entities into a generation of populations that need to evolve to have their tempids /// resolved or allocated, and a population of inert entities that do not reference tempids. - pub fn from(terms: I, db: &DB) -> errors::Result<(Generation, Population)> where I: IntoIterator { + pub fn from(terms: I, schema: &Schema) -> errors::Result<(Generation, Population)> where I: IntoIterator { let mut generation = Generation::default(); let mut inert = vec![]; - let is_unique = |a: &Entid| -> errors::Result { - let attribute: &Attribute = db.schema.require_attribute_for_entid(*a)?; + let is_unique = |a: Entid| -> errors::Result { + let attribute: &Attribute = schema.require_attribute_for_entid(a)?; Ok(attribute.unique_identity) }; for term in terms.into_iter() { match term { Term::AddOrRetract(op, Err(e), a, Err(v)) => { - if op == OpType::Add && is_unique(&a)? { + if op == OpType::Add && is_unique(a)? { generation.upserts_ev.push(UpsertEV(e, a, v)); } else { generation.allocations.push(Term::AddOrRetract(op, Err(e), a, Err(v))); } }, Term::AddOrRetract(op, Err(e), a, Ok(v)) => { - if op == OpType::Add && is_unique(&a)? { + if op == OpType::Add && is_unique(a)? { generation.upserts_e.push(UpsertE(e, a, v)); } else { generation.allocations.push(Term::AddOrRetract(op, Err(e), a, Ok(v))); From 558d41938bb85762264940db86c23ffe6957fdbd Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Sun, 12 Feb 2017 14:22:20 -0800 Subject: [PATCH 10/13] Post: Comment on implementation choices in the transactor. --- db/src/tx.rs | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/db/src/tx.rs b/db/src/tx.rs index f9e9bedd6..1fb1562d3 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -10,6 +10,41 @@ #![allow(dead_code)] +//! This module implements the transaction application algorithm described at +//! https://github.com/mozilla/mentat/wiki/Transacting and its children pages. +//! +//! The implementation proceeds in four main stages, labeled "Pipeline stage 1" through "Pipeline +//! stage 4". _Pipeline_ may be a misnomer, since the stages as written **cannot** be interleaved +//! in parallel. That is, a single transacted entity cannot flow through all the stages without its +//! sibling entities. +//! +//! This unintuitive architectural decision was made because the second and third stages (resolving +//! lookup refs and tempids, respectively) operate _in bulk_ to minimize the number of expensive +//! SQLite queries by processing many in one SQLite invocation. Pipeline stage 2 doesn't need to +//! operate like this: it is easy to handle each transacted entity independently of all the others +//! (and earlier, less efficient, implementations did this). However, Pipeline stage 3 appears to +//! require processing multiple elements at the same time, since there can be arbitrarily complex +//! graph relationships between tempids. Pipeline stage 4 (inserting elements into the SQL store) +//! could also be expressed as an independent operation per transacted entity, but there are +//! non-trivial uniqueness relationships inside a single transaction that need to enforced. +//! Therefore, some multi-entity processing is required, and a per-entity pipeline becomes less +//! attractive. +//! +//! A note on the types in the implementation. The pipeline stages are strongly typed: each stage +//! accepts and produces a subset of the previous. We hope this will reduce errors as data moves +//! through the system. In contrast the Clojure implementation rewrote the fundamental entity type +//! in place and suffered bugs where particular code paths missed cases. +//! +//! The type hierarchy accepts `Entity` instances from the transaction parser and flows `Term` +//! instances through the term-rewriting transaction applier. `Term` is a general `[:db/add e a v]` +//! with restrictions on the `e` and `v` components. The hierarchy is expressed using `Result` to +//! model either/or, and layers of `Result` are stripped -- we might say the `Term` instances are +//! _lowered_ as they flow through the pipeline. This type hierarchy could have been expressed by +//! combinatorially increasing `enum` cases, but this makes it difficult to handle the `e` and `v` +//! components symmetrically. Hence, layers of `Result` type aliases. Hopefully the explanatory +//! names -- `TermWithTempIdsAndLookupRefs`, anyone? -- and strongly typed stage functions will help +//! keep everything straight. + use std; use std::collections::BTreeSet; From f2c56cb419db9925042c2524aebead0ddf683fd0 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Tue, 14 Feb 2017 13:13:28 -0800 Subject: [PATCH 11/13] Review comment: Put long use lists on separate lines. --- db/src/tx.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/db/src/tx.rs b/db/src/tx.rs index 1fb1562d3..ac8cefb45 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -51,7 +51,15 @@ use std::collections::BTreeSet; use db::{ReducedEntity, SearchType}; use entids; use errors::*; -use internal_types::{LookupRefOrTempId, TempId, TempIdMap, Term, TermWithTempIdsAndLookupRefs, TermWithTempIds, TermWithoutTempIds, replace_lookup_ref}; +use internal_types::{ + LookupRefOrTempId, + TempId, + TempIdMap, + Term, + TermWithTempIdsAndLookupRefs, + TermWithTempIds, + TermWithoutTempIds, + replace_lookup_ref}; use mentat_core::intern_set; use mentat_tx::entities as entmod; use mentat_tx::entities::{Entity, OpType}; From 97a647bbe2453008201c6259c9248d38595a1325 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Tue, 14 Feb 2017 13:31:28 -0800 Subject: [PATCH 12/13] Review comment: Accept String: Borrow instead of just String. --- db/src/db.rs | 10 ++++++---- db/src/tx.rs | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index e2435b119..8e55d7262 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -10,7 +10,9 @@ #![allow(dead_code)] +use std::borrow::Borrow; use std::collections::HashMap; +use std::fmt::Display; use std::iter::{once, repeat}; use std::ops::Range; use std::path::Path; @@ -807,13 +809,13 @@ impl DB { } /// Allocate a single fresh entid in the given `partition`. - pub fn allocate_entid(&mut self, partition: String) -> i64 { + pub fn allocate_entid(&mut self, partition: &S) -> i64 where String: Borrow { self.allocate_entids(partition, 1).start } /// Allocate `n` fresh entids in the given `partition`. - pub fn allocate_entids(&mut self, partition: String, n: usize) -> Range { - match self.partition_map.get_mut(&partition) { + pub fn allocate_entids(&mut self, partition: &S, n: usize) -> Range where String: Borrow { + match self.partition_map.get_mut(partition) { Some(mut partition) => { let idx = partition.index; partition.index += n as i64; @@ -834,7 +836,7 @@ impl DB { // now, it's just about the tx details. let tx_instant = now(); // Label the transaction with the timestamp when we first see it: leading edge. - let tx_id = self.allocate_entid(":db.part/tx".to_string()); + let tx_id = self.allocate_entid(":db.part/tx"); self.create_temp_tables(conn)?; diff --git a/db/src/tx.rs b/db/src/tx.rs index ac8cefb45..c8b978b63 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -253,7 +253,7 @@ impl<'conn> Tx<'conn> { let unresolved_temp_ids: BTreeSet = generation.temp_ids_in_allocations(); // TODO: track partitions for temporary IDs. - let entids = self.db.allocate_entids(":db.part/user".to_string(), unresolved_temp_ids.len()); + let entids = self.db.allocate_entids(":db.part/user", unresolved_temp_ids.len()); let temp_id_allocations: TempIdMap = unresolved_temp_ids.into_iter().zip(entids).collect(); From f715a27bf94281a8f0687d96b0e15fde1a1a7f70 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Tue, 14 Feb 2017 13:44:43 -0800 Subject: [PATCH 13/13] Review comment: Address nits. --- db/src/db.rs | 4 ++-- db/src/tx.rs | 12 +++++++----- db/src/types.rs | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index 8e55d7262..9aa6b1050 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -207,7 +207,7 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { tx.execute("INSERT INTO parts VALUES (?, ?, ?)", &[part, &partition.start, &partition.index])?; } - // TODO: return to transact_internal to self manage the encompassing SQLite transaction. + // TODO: return to transact_internal to self-manage the encompassing SQLite transaction. let mut bootstrap_db = DB::new(bootstrap_partition_map, bootstrap::bootstrap_schema()); bootstrap_db.transact(&tx, bootstrap::bootstrap_entities())?; @@ -800,7 +800,7 @@ impl DB { }).collect(); // TODO: only cache the latest of these statements. Changing the set of partitions isn't - // supported in the Clojure implementationat all, and might not be supported in Mentat soon, + // supported in the Clojure implementation at all, and might not be supported in Mentat soon, // so this is very low priority. let mut stmt = conn.prepare_cached(s.as_str())?; stmt.execute(¶ms[..]) diff --git a/db/src/tx.rs b/db/src/tx.rs index c8b978b63..14933ce5b 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -80,7 +80,7 @@ pub struct Tx<'conn> { /// The transaction ID of the transaction. pub tx_id: Entid, - /// The timestamp when the transaction began to be commited. + /// The timestamp when the transaction began to be committed. /// /// This is milliseconds after the Unix epoch according to the transactor's local clock. // TODO: :db.type/instant. @@ -89,10 +89,12 @@ pub struct Tx<'conn> { impl<'conn> Tx<'conn> { pub fn new(db: &'conn mut DB, conn: &'conn rusqlite::Connection, tx_id: Entid, tx_instant: i64) -> Tx<'conn> { - Tx { db: db, - conn: conn, - tx_id: tx_id, - tx_instant: tx_instant } + Tx { + db: db, + conn: conn, + tx_id: tx_id, + tx_instant: tx_instant, + } } /// Given a collection of tempids and the [a v] pairs that they might upsert to, resolve exactly diff --git a/db/src/types.rs b/db/src/types.rs index 4374d7661..0403f47a7 100644 --- a/db/src/types.rs +++ b/db/src/types.rs @@ -83,7 +83,7 @@ pub struct TxReport { /// The transaction ID of the transaction. pub tx_id: Entid, - /// The timestamp when the transaction was commited. + /// The timestamp when the transaction began to be committed. /// /// This is milliseconds after the Unix epoch according to the transactor's local clock. // TODO: :db.type/instant.