diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 835370dfb25..f777d66e74b 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -3200,6 +3200,7 @@ dependencies = [ "anyhow", "colored", "env_logger", + "fastfield_codecs", "home", "itertools", "num_cpus", @@ -3210,6 +3211,7 @@ dependencies = [ "regex", "serde", "serde_json", + "tantivy", "tempfile", "tokio", "tracing", diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 484723e518e..3d78783539b 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -13,6 +13,7 @@ documentation = "https://quickwit.io/docs/" anyhow = { workspace = true } colored = { workspace = true } env_logger = { workspace = true } +fastfield_codecs = { workspace = true } home = { workspace = true } itertools = { workspace = true } num_cpus = { workspace = true } @@ -22,6 +23,7 @@ prometheus = { workspace = true } rand = { workspace = true } regex = { workspace = true } serde = { workspace = true } +tantivy = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } warp = { workspace = true } diff --git a/quickwit/quickwit-common/src/fast_field_reader.rs b/quickwit/quickwit-common/src/fast_field_reader.rs new file mode 100644 index 00000000000..e8372004014 --- /dev/null +++ b/quickwit/quickwit-common/src/fast_field_reader.rs @@ -0,0 +1,80 @@ +// Copyright (C) 2022 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::sync::Arc; + +use tantivy::fastfield::{Column, FastFieldReaders}; +use tantivy::schema::{Field, FieldEntry, Type}; +use tantivy::{DateTime, DocId, TantivyError}; + +#[derive(Clone)] +pub enum GenericFastFieldReader { + I64(Arc>), + Date(Arc>), +} + +impl GenericFastFieldReader { + pub fn min_value(&self) -> i64 { + match self { + GenericFastFieldReader::I64(fast_reader) => fast_reader.min_value(), + GenericFastFieldReader::Date(fast_reader) => { + fast_reader.min_value().into_timestamp_secs() + } + } + } + + pub fn max_value(&self) -> i64 { + match self { + GenericFastFieldReader::I64(fast_reader) => fast_reader.max_value(), + GenericFastFieldReader::Date(fast_reader) => { + fast_reader.max_value().into_timestamp_secs() + } + } + } + + pub fn get(&self, doc_id: DocId) -> i64 { + match self { + GenericFastFieldReader::I64(fast_reader) => fast_reader.get_val(doc_id as u64), + GenericFastFieldReader::Date(fast_reader) => { + fast_reader.get_val(doc_id as u64).into_timestamp_secs() + } + } + } +} + +pub fn timestamp_field_reader( + timestamp_field: Field, + timestamp_field_entry: &FieldEntry, + fast_field_readers: &FastFieldReaders, +) -> tantivy::Result { + let field_schema_type = timestamp_field_entry.field_type().value_type(); + let timestamp_field_reader = match field_schema_type { + Type::I64 => GenericFastFieldReader::I64(fast_field_readers.i64(timestamp_field)?), + Type::Date => GenericFastFieldReader::Date(fast_field_readers.date(timestamp_field)?), + _ => { + return Err(TantivyError::SchemaError(format!( + "Failed to build timestamp filter for field `{:?}`: expected I64 or Date type, \ + got `{:?}`.", + timestamp_field_entry.name(), + field_schema_type + ))) + } + }; + Ok(timestamp_field_reader) +} diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index e23f1879b76..90683d39b78 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -20,6 +20,7 @@ mod checklist; mod coolid; +pub mod fast_field_reader; pub mod fs; pub mod metrics; pub mod net; diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index 644b0ef1e90..e8d07e64731 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -28,6 +28,7 @@ use async_trait::async_trait; use fail::fail_point; use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::fast_field_reader::timestamp_field_reader; use quickwit_common::runtimes::RuntimeType; use quickwit_config::build_doc_mapper; use quickwit_directories::UnionDirectory; @@ -449,7 +450,14 @@ impl MergeExecutor { timestamp_field_name )) })?; - let reader = merged_segment_reader.fast_fields().i64(timestamp_field)?; + let timestamp_field_entry = merged_segment_reader + .schema() + .get_field_entry(timestamp_field); + let reader = timestamp_field_reader( + timestamp_field, + timestamp_field_entry, + merged_segment_reader.fast_fields(), + )?; Some(RangeInclusive::new(reader.min_value(), reader.max_value())) } else { None @@ -512,13 +520,16 @@ mod tests { - name: body type: text - name: ts - type: i64 + type: datetime + input_formats: + - unix_ts_millis fast: true "#; + let indexing_settings_yaml = "timestamp_field: ts"; let test_sandbox = TestSandbox::create( &pipeline_id.index_id, doc_mapping_yaml, - "{}", + indexing_settings_yaml, &["body"], None, ) @@ -614,11 +625,20 @@ mod tests { - name: body type: text - name: ts - type: i64 + type: datetime + input_formats: + - unix_ts_millis fast: true "#; - let test_sandbox = - TestSandbox::create(index_id, doc_mapping_yaml, "{}", &["body"], None).await?; + let indexing_settings_yaml = "timestamp_field: ts"; + let test_sandbox = TestSandbox::create( + index_id, + doc_mapping_yaml, + indexing_settings_yaml, + &["body"], + None, + ) + .await?; let docs = vec![ serde_json::json!({"body": "info", "ts": 0 }), serde_json::json!({"body": "info", "ts": 0 }), diff --git a/quickwit/quickwit-search/src/filters.rs b/quickwit/quickwit-search/src/filters.rs index 135e351e516..682137ac5af 100644 --- a/quickwit/quickwit-search/src/filters.rs +++ b/quickwit/quickwit-search/src/filters.rs @@ -18,46 +18,10 @@ // along with this program. If not, see . use std::ops::{Bound, RangeBounds}; -use std::sync::Arc; -use tantivy::fastfield::Column; -use tantivy::schema::{Field, Type}; -use tantivy::{DateTime, DocId, SegmentReader, TantivyError}; - -#[derive(Clone)] -enum GenericFastFieldReader { - I64(Arc>), - Date(Arc>), -} - -impl GenericFastFieldReader { - fn min_value(&self) -> i64 { - match self { - GenericFastFieldReader::I64(fast_reader) => fast_reader.min_value(), - GenericFastFieldReader::Date(fast_reader) => { - fast_reader.min_value().into_timestamp_secs() - } - } - } - - fn max_value(&self) -> i64 { - match self { - GenericFastFieldReader::I64(fast_reader) => fast_reader.max_value(), - GenericFastFieldReader::Date(fast_reader) => { - fast_reader.max_value().into_timestamp_secs() - } - } - } - - fn get(&self, doc_id: DocId) -> i64 { - match self { - GenericFastFieldReader::I64(fast_reader) => fast_reader.get_val(doc_id as u64), - GenericFastFieldReader::Date(fast_reader) => { - fast_reader.get_val(doc_id as u64).into_timestamp_secs() - } - } - } -} +use quickwit_common::fast_field_reader::{timestamp_field_reader, GenericFastFieldReader}; +use tantivy::schema::Field; +use tantivy::{DocId, SegmentReader}; /// A filter that only retains docs within a time range. #[derive(Clone)] @@ -107,28 +71,14 @@ impl TimestampFilterBuilder { &self, segment_reader: &SegmentReader, ) -> tantivy::Result> { - let field_entry = segment_reader + let timestamp_field_entry = segment_reader .schema() .get_field_entry(self.timestamp_field); - - let field_schema_type = field_entry.field_type().value_type(); - let timestamp_field_reader = match field_entry.field_type().value_type() { - Type::I64 => { - GenericFastFieldReader::I64(segment_reader.fast_fields().i64(self.timestamp_field)?) - } - Type::Date => GenericFastFieldReader::Date( - segment_reader.fast_fields().date(self.timestamp_field)?, - ), - _ => { - return Err(TantivyError::SchemaError(format!( - "Failed to build timestamp filter for field `{:?}`: expected I64 or Date \ - type, got `{:?}`.", - field_entry.name(), - field_schema_type - ))) - } - }; - + let timestamp_field_reader = timestamp_field_reader( + self.timestamp_field, + timestamp_field_entry, + segment_reader.fast_fields(), + )?; let segment_range = ( timestamp_field_reader.min_value(), timestamp_field_reader.max_value(),