From 9221d32245ab06525bc5d2f8686b6ce019d4c0a9 Mon Sep 17 00:00:00 2001 From: Akshay Date: Thu, 9 Nov 2023 15:52:29 +0000 Subject: [PATCH 1/4] add repo sync analytics tracks repo metadata on first index, or on full reindex (when the tantivy index hash is updated, for example), includes the following info: - repo ref - repo provider - size in bytes - number of qdrant chunks - time taken to index sample event: RepoEvent { name: "index", payload: [ ("repo_ref", String("nerdypepper/dijo")), ("provider", String("github")), ("file_count", Number(35)), ("chunk_count", Number(236)), ("bytes", String("116.59KB")), ("sync_time", String("4.957807429s")), ], } --- to test this changeset, set the following values in `local_config.json`: { "analytics_key": "..", "analytics_data_plane": ".." } run rudderstack in "live event stream" mode, and index a repository. --- server/bleep/src/analytics.rs | 41 +++++ server/bleep/src/background/sync.rs | 2 +- server/bleep/src/cache.rs | 25 ++- server/bleep/src/indexes/file.rs | 240 +++++++++++++++++++--------- server/bleep/src/repo/iterator.rs | 11 ++ 5 files changed, 239 insertions(+), 80 deletions(-) diff --git a/server/bleep/src/analytics.rs b/server/bleep/src/analytics.rs index 14869d66ff..ed7cbf3e13 100644 --- a/server/bleep/src/analytics.rs +++ b/server/bleep/src/analytics.rs @@ -73,6 +73,36 @@ impl DocEvent { } } +#[derive(Debug, Clone)] +pub struct RepoEvent { + pub name: String, + pub payload: Vec<(String, Value)>, +} + +impl RepoEvent { + pub fn new(name: &str) -> Self { + Self { + name: name.to_owned(), + payload: vec![], + } + } + + pub fn with_payload(mut self, name: &str, payload: &T) -> Self { + self.payload.push(( + name.to_owned(), + serde_json::to_value(payload.clone()).unwrap(), + )); + self + } + + pub fn add_payload(&mut self, name: &str, payload: &T) { + self.payload.push(( + name.to_owned(), + serde_json::to_value(payload.clone()).unwrap(), + )); + } +} + #[derive(Debug, Clone, Serialize)] pub struct EventData { kind: EventKind, @@ -261,6 +291,17 @@ impl RudderHub { ..Default::default() })); } + + pub fn track_repo(&self, event: RepoEvent, user: &crate::webserver::middleware::User) { + self.send(Message::Track(Track { + user_id: Some(self.tracking_id(user.username())), + event: "track_repo_index".into(), + properties: Some(serde_json::json!({ + "payload": event.payload + })), + ..Default::default() + })); + } } impl From> for DeviceId { diff --git a/server/bleep/src/background/sync.rs b/server/bleep/src/background/sync.rs index 6456d1614c..e1e65a2b3e 100644 --- a/server/bleep/src/background/sync.rs +++ b/server/bleep/src/background/sync.rs @@ -19,7 +19,7 @@ pub struct SyncHandle { pub(crate) filter_updates: FilterUpdate, pub(crate) pipes: SyncPipes, pub(crate) file_cache: FileCache, - app: Application, + pub(crate) app: Application, exited: flume::Sender, exit_signal: flume::Receiver, } diff --git a/server/bleep/src/cache.rs b/server/bleep/src/cache.rs index 644419cfda..517267dfb2 100644 --- a/server/bleep/src/cache.rs +++ b/server/bleep/src/cache.rs @@ -150,6 +150,19 @@ pub struct FileCache { embed_queue: EmbedQueue, } +#[derive(Default)] +pub struct InsertStats { + pub new: usize, + pub updated: usize, + pub deleted: usize, +} + +impl InsertStats { + fn empty() -> Self { + Self::default() + } +} + impl<'a> FileCache { pub(crate) fn new(db: SqlDb, semantic: Semantic) -> Self { Self { @@ -396,7 +409,7 @@ impl<'a> FileCache { buffer: &str, lang_str: &str, branches: &[String], - ) { + ) -> InsertStats { let chunk_cache = self.chunks_for_file(repo_ref, cache_keys).await; self.semantic .chunks_for_buffer( @@ -420,10 +433,16 @@ impl<'a> FileCache { info!( repo_name, relative_path, new, updated, deleted, "Successful commit" - ) + ); + InsertStats { + new, + updated, + deleted, + } } Err(err) => { - warn!(repo_name, relative_path, ?err, "Failed to upsert vectors") + warn!(repo_name, relative_path, ?err, "Failed to upsert vectors"); + InsertStats::empty() } } } diff --git a/server/bleep/src/indexes/file.rs b/server/bleep/src/indexes/file.rs index 43ab6b911c..04d57fdef3 100644 --- a/server/bleep/src/indexes/file.rs +++ b/server/bleep/src/indexes/file.rs @@ -29,6 +29,7 @@ use super::{ DocumentRead, Indexable, Indexer, }; use crate::{ + analytics::RepoEvent, background::SyncHandle, cache::{CacheKeys, FileCache, FileCacheSnapshot}, intelligence::TreeSitterFile, @@ -85,6 +86,7 @@ impl Indexable for File { ref reporef, ref file_cache, ref pipes, + ref app, .. }: &SyncHandle, repo: &Repository, @@ -93,11 +95,19 @@ impl Indexable for File { ) -> Result<()> { let file_filter = FileFilter::compile(&repo.file_filter)?; let cache = file_cache.retrieve(reporef).await; + let is_index_job = cache.is_empty(); // not a reindex job if there are no cache hits let repo_name = reporef.indexed_name(); let processed = &AtomicU64::new(0); + let mut analytics_event = RepoEvent::new("index") + .with_payload("repo_ref", &reporef.name()) + .with_payload("provider", &reporef.backend); + let (worker_stats_tx, mut worker_stats_rx) = + tokio::sync::mpsc::unbounded_channel::(); // collect stats for analytics + let file_worker = |count: usize| { let cache = &cache; + let worker_stats_tx = worker_stats_tx.clone(); let callback = move |dir_entry: RepoDirEntry| { let completed = processed.fetch_add(1, Ordering::Relaxed); pipes.index_percent(((completed as f32 / count as f32) * 100f32) as u8); @@ -124,8 +134,12 @@ impl Indexable for File { }; trace!(entry_disk_path, "queueing entry"); - if let Err(err) = self.worker(dir_entry, workload, writer) { - warn!(%err, entry_disk_path, "indexing failed; skipping"); + + match self.worker(dir_entry, workload, writer) { + Ok(stats) => { + let _ = worker_stats_tx.send(stats); + } + Err(err) => warn!(%err, entry_disk_path, "indexing failed; skipping"), } if let Err(err) = cache.parent().process_embedding_queue() { @@ -153,6 +167,7 @@ impl Indexable for File { repo.branch_filter.as_ref().map(Into::into), )?; let count = walker.len(); + analytics_event.add_payload("file_count", &count); walker.for_each(pipes, file_worker(count)); } else { let branch = gix::open::Options::isolated() @@ -173,6 +188,7 @@ impl Indexable for File { let walker = FileWalker::index_directory(&repo.disk_path, branch); let count = walker.len(); + analytics_event.add_payload("file_count", &count); walker.for_each(pipes, file_worker(count)); }; @@ -180,7 +196,24 @@ impl Indexable for File { bail!("cancelled"); } - info!(?repo.disk_path, "repo file indexing finished, took {:?}", start.elapsed()); + // aggregate stats + drop(worker_stats_tx); + let mut repo_stats = WorkerStats::default(); + while let Some(stats) = worker_stats_rx.recv().await { + repo_stats += stats; + } + analytics_event.add_payload("chunk_count", &repo_stats.chunks); + analytics_event.add_payload("bytes", &human_readable(repo_stats.size)); + + let sync_time = start.elapsed(); + info!(?repo.disk_path, "repo file indexing finished, took {:?}", sync_time); + analytics_event.add_payload("sync_time", &format!("{sync_time:?}")); + + // send an analytics event if this is the first time this repo is being indexed + if is_index_job { + let user = app.user().await; + app.with_analytics(|hub| hub.track_repo(analytics_event, &user)); + } file_cache .synchronize(cache, |key| { @@ -541,6 +574,21 @@ impl Indexer { } } +#[derive(Default)] +struct WorkerStats { + // size in bytes + size: usize, + // number of qdrant chunkc + chunks: usize, +} + +impl std::ops::AddAssign for WorkerStats { + fn add_assign(&mut self, rhs: Self) { + self.size += rhs.size; + self.chunks += rhs.chunks; + } +} + impl File { #[tracing::instrument(fields(repo=%workload.repo_ref, entry_disk_path=?dir_entry.path()), skip_all)] fn worker( @@ -548,7 +596,7 @@ impl File { dir_entry: RepoDirEntry, workload: Workload<'_>, writer: &IndexWriter, - ) -> Result<()> { + ) -> Result { #[cfg(feature = "debug")] let start = Instant::now(); trace!("processing file"); @@ -556,20 +604,22 @@ impl File { let cache_keys = workload.cache_keys(&dir_entry); let last_commit = workload.repo_metadata.last_commit_unix_secs.unwrap_or(0); - match dir_entry { + let stats = match dir_entry { _ if workload.cache.is_fresh(&cache_keys) => { info!("fresh; skipping"); - return Ok(()); + return Ok(WorkerStats::default()); } RepoDirEntry::Dir(dir) => { trace!("writing dir document"); - let doc = dir.build_document(self, &workload, last_commit as u64, &cache_keys); + let (doc_stats, doc) = + dir.build_document(self, &workload, last_commit as u64, &cache_keys); writer.add_document(doc)?; trace!("dir document written"); + doc_stats } RepoDirEntry::File(file) => { trace!("writing file document"); - let doc = file + let (doc_stats, doc) = file .build_document( self, &workload, @@ -579,10 +629,10 @@ impl File { ) .ok_or(anyhow::anyhow!("failed to build document"))?; writer.add_document(doc)?; - trace!("file document written"); + doc_stats } - } + }; #[cfg(feature = "debug")] { @@ -607,7 +657,7 @@ impl File { } } - Ok(()) + Ok(stats) } } @@ -619,7 +669,7 @@ impl RepoDir { workload: &Workload<'_>, last_commit: u64, cache_keys: &CacheKeys, - ) -> tantivy::schema::Document { + ) -> (WorkerStats, tantivy::schema::Document) { let Workload { relative_path, repo_name, @@ -633,30 +683,37 @@ impl RepoDir { let relative_path_str = relative_path_str.replace('\\', "/"); let branches = self.branches.join("\n"); + let stats = WorkerStats { + size: self.size(), + chunks: 0, + }; + + ( + stats, + doc!( + schema.raw_repo_name => repo_name.as_bytes(), + schema.raw_relative_path => relative_path_str.as_bytes(), + schema.repo_disk_path => repo_disk_path.to_string_lossy().as_ref(), + schema.relative_path => relative_path_str, + schema.repo_ref => repo_ref.to_string(), + schema.repo_name => *repo_name, + schema.last_commit_unix_seconds => last_commit, + schema.branches => branches, + schema.is_directory => true, + schema.unique_hash => cache_keys.tantivy(), - doc!( - schema.raw_repo_name => repo_name.as_bytes(), - schema.raw_relative_path => relative_path_str.as_bytes(), - schema.repo_disk_path => repo_disk_path.to_string_lossy().as_ref(), - schema.relative_path => relative_path_str, - schema.repo_ref => repo_ref.to_string(), - schema.repo_name => *repo_name, - schema.last_commit_unix_seconds => last_commit, - schema.branches => branches, - schema.is_directory => true, - schema.unique_hash => cache_keys.tantivy(), - - // always indicate dirs as indexed - schema.indexed => true, - - // nulls - schema.raw_content => Vec::::default(), - schema.content => String::default(), - schema.line_end_indices => Vec::::default(), - schema.lang => Vec::::default(), - schema.avg_line_length => f64::default(), - schema.symbol_locations => bincode::serialize(&SymbolLocations::default()).unwrap(), - schema.symbols => String::default(), + // always indicate dirs as indexed + schema.indexed => true, + + // nulls + schema.raw_content => Vec::::default(), + schema.content => String::default(), + schema.line_end_indices => Vec::::default(), + schema.lang => Vec::::default(), + schema.avg_line_length => f64::default(), + schema.symbol_locations => bincode::serialize(&SymbolLocations::default()).unwrap(), + schema.symbols => String::default(), + ), ) } } @@ -670,7 +727,7 @@ impl RepoFile { cache_keys: &CacheKeys, last_commit: u64, file_cache: &FileCache, - ) -> Option { + ) -> Option<(WorkerStats, tantivy::schema::Document)> { let Workload { relative_path, repo_name, @@ -689,6 +746,10 @@ impl RepoFile { let branches = self.branches.join("\n"); let explicitly_allowed = file_filter.is_allowed(relative_path); let indexed = explicitly_allowed.unwrap_or_else(|| self.should_index()); + let mut stats = WorkerStats { + size: self.size(), + ..Default::default() + }; if !indexed { let lang_str = repo_metadata @@ -699,25 +760,28 @@ impl RepoFile { "" }); - return Some(doc!( - schema.raw_content => vec![], - schema.content => "", - schema.line_end_indices => vec![], - schema.avg_line_length => 0f64, - schema.symbol_locations => vec![], - schema.symbols => vec![], - schema.raw_repo_name => repo_name.as_bytes(), - schema.raw_relative_path => relative_path_str.as_bytes(), - schema.unique_hash => cache_keys.tantivy(), - schema.repo_disk_path => repo_disk_path.to_string_lossy().as_ref(), - schema.relative_path => relative_path_str, - schema.repo_ref => repo_ref.to_string(), - schema.repo_name => *repo_name, - schema.lang => lang_str.to_ascii_lowercase().as_bytes(), - schema.last_commit_unix_seconds => last_commit, - schema.branches => branches, - schema.is_directory => false, - schema.indexed => false, + return Some(( + stats, + doc!( + schema.raw_content => vec![], + schema.content => "", + schema.line_end_indices => vec![], + schema.avg_line_length => 0f64, + schema.symbol_locations => vec![], + schema.symbols => vec![], + schema.raw_repo_name => repo_name.as_bytes(), + schema.raw_relative_path => relative_path_str.as_bytes(), + schema.unique_hash => cache_keys.tantivy(), + schema.repo_disk_path => repo_disk_path.to_string_lossy().as_ref(), + schema.relative_path => relative_path_str, + schema.repo_ref => repo_ref.to_string(), + schema.repo_name => *repo_name, + schema.lang => lang_str.to_ascii_lowercase().as_bytes(), + schema.last_commit_unix_seconds => last_commit, + schema.branches => branches, + schema.is_directory => false, + schema.indexed => false, + ), )); } @@ -779,7 +843,7 @@ impl RepoFile { let lines_avg = buffer.len() as f64 / buffer.lines().count() as f64; - tokio::task::block_in_place(|| { + let insert_stats = tokio::task::block_in_place(|| { Handle::current().block_on(async { file_cache .process_semantic( @@ -791,29 +855,34 @@ impl RepoFile { lang_str, &self.branches, ) - .await; + .await }) }); - Some(doc!( - schema.raw_content => buffer.as_bytes(), - schema.raw_repo_name => repo_name.as_bytes(), - schema.raw_relative_path => relative_path_str.as_bytes(), - schema.unique_hash => cache_keys.tantivy(), - schema.repo_disk_path => repo_disk_path.to_string_lossy().as_ref(), - schema.relative_path => relative_path_str, - schema.repo_ref => repo_ref.to_string(), - schema.repo_name => *repo_name, - schema.content => buffer, - schema.line_end_indices => line_end_indices, - schema.lang => lang_str.to_ascii_lowercase().as_bytes(), - schema.avg_line_length => lines_avg, - schema.last_commit_unix_seconds => last_commit, - schema.symbol_locations => bincode::serialize(&symbol_locations).unwrap(), - schema.symbols => symbols, - schema.branches => branches, - schema.is_directory => false, - schema.indexed => true, + stats.chunks += insert_stats.new; + + Some(( + stats, + doc!( + schema.raw_content => buffer.as_bytes(), + schema.raw_repo_name => repo_name.as_bytes(), + schema.raw_relative_path => relative_path_str.as_bytes(), + schema.unique_hash => cache_keys.tantivy(), + schema.repo_disk_path => repo_disk_path.to_string_lossy().as_ref(), + schema.relative_path => relative_path_str, + schema.repo_ref => repo_ref.to_string(), + schema.repo_name => *repo_name, + schema.content => buffer, + schema.line_end_indices => line_end_indices, + schema.lang => lang_str.to_ascii_lowercase().as_bytes(), + schema.avg_line_length => lines_avg, + schema.last_commit_unix_seconds => last_commit, + schema.symbol_locations => bincode::serialize(&symbol_locations).unwrap(), + schema.symbols => symbols, + schema.branches => branches, + schema.is_directory => false, + schema.indexed => true, + ), )) } } @@ -880,6 +949,18 @@ fn build_fuzzy_regex_filter(query_str: &str) -> Option { .ok() } +fn human_readable(size: usize) -> String { + let suffixes = ["B", "KB", "MB", "GB"]; + let s = suffixes + .iter() + .zip(0..10) + .rev() + .map(|(suf, exp)| (suf, size as f64 / (1024_f64.powi(exp)))) + .find(|(_, t)| t >= &1.0); + s.map(|(suffix, value)| format!("{value:.2}{suffix}")) + .unwrap_or_else(|| size.to_string()) +} + #[cfg(test)] mod tests { use super::*; @@ -899,4 +980,11 @@ mod tests { // tests addition of character `n` assert!(filter.as_ref().unwrap().is_match("查询解析器在哪n")); } + + #[test] + fn human_readable() { + assert_eq!(super::human_readable(15), "15.00B"); + assert_eq!(super::human_readable(1024), "1.00KB"); + assert_eq!(super::human_readable(7616597515), "7.09GB"); + } } diff --git a/server/bleep/src/repo/iterator.rs b/server/bleep/src/repo/iterator.rs index a02d2b78be..587af16a5e 100644 --- a/server/bleep/src/repo/iterator.rs +++ b/server/bleep/src/repo/iterator.rs @@ -83,6 +83,13 @@ pub struct RepoDir { pub branches: Vec, } +impl RepoDir { + pub fn size(&self) -> usize { + use std::io::{Cursor, Seek, SeekFrom}; + Cursor::new(&self.path).seek(SeekFrom::End(0)).unwrap_or(0) as usize + } +} + pub struct RepoFile { /// Path to file pub path: String, @@ -102,6 +109,10 @@ impl RepoFile { pub fn buffer(&self) -> std::io::Result { (self.buffer)() } + + pub fn size(&self) -> usize { + self.len as usize + } } #[derive(Hash, Eq, PartialEq)] From c3e5ac8fb0191f3a31dd21eb2574b69997baa23d Mon Sep 17 00:00:00 2001 From: Akshay Date: Fri, 10 Nov 2023 11:25:22 +0000 Subject: [PATCH 2/4] track reindexs - marks full index resets as `SchemaUpgrade`s - marks periodic reindex jobs as `PeriodicSync`s --- server/bleep/src/indexes.rs | 7 +++++ server/bleep/src/indexes/file.rs | 48 ++++++++++++++++++++++++++++++-- server/bleep/src/lib.rs | 8 ++++-- 3 files changed, 58 insertions(+), 5 deletions(-) diff --git a/server/bleep/src/indexes.rs b/server/bleep/src/indexes.rs index 8d5c829549..14d3397d0e 100644 --- a/server/bleep/src/indexes.rs +++ b/server/bleep/src/indexes.rs @@ -79,6 +79,7 @@ pub struct Indexes { pub repo: Indexer, pub file: Indexer, pub doc: Doc, + was_index_reset: bool, write_mutex: tokio::sync::Mutex<()>, } @@ -104,9 +105,15 @@ impl Indexes { config.max_threads, )?, write_mutex: Default::default(), + was_index_reset: false, }) } + pub fn was_index_reset(mut self, was_index_reset: bool) -> Self { + self.was_index_reset = was_index_reset; + self + } + pub fn reset_databases(config: &Configuration) -> Result<()> { // we don't support old schemas, and tantivy will hard // error if we try to open a db with a different schema. diff --git a/server/bleep/src/indexes/file.rs b/server/bleep/src/indexes/file.rs index 04d57fdef3..164af2ba29 100644 --- a/server/bleep/src/indexes/file.rs +++ b/server/bleep/src/indexes/file.rs @@ -3,6 +3,7 @@ use std::{ panic::AssertUnwindSafe, path::{Path, PathBuf}, sync::atomic::{AtomicU64, Ordering}, + time::Duration, }; use anyhow::{bail, Result}; @@ -95,7 +96,7 @@ impl Indexable for File { ) -> Result<()> { let file_filter = FileFilter::compile(&repo.file_filter)?; let cache = file_cache.retrieve(reporef).await; - let is_index_job = cache.is_empty(); // not a reindex job if there are no cache hits + let is_first_index = cache.is_empty(); let repo_name = reporef.indexed_name(); let processed = &AtomicU64::new(0); @@ -199,18 +200,25 @@ impl Indexable for File { // aggregate stats drop(worker_stats_tx); let mut repo_stats = WorkerStats::default(); - while let Some(stats) = worker_stats_rx.recv().await { + repo_stats.is_first_index = is_first_index; + repo_stats.was_index_reset = app.indexes.was_index_reset; + while let Ok(Some(stats)) = + tokio::time::timeout(Duration::from_millis(10), worker_stats_rx.recv()).await + { repo_stats += stats; } analytics_event.add_payload("chunk_count", &repo_stats.chunks); analytics_event.add_payload("bytes", &human_readable(repo_stats.size)); + analytics_event.add_payload("index_job_kind", &repo_stats.index_job_kind()); let sync_time = start.elapsed(); info!(?repo.disk_path, "repo file indexing finished, took {:?}", sync_time); analytics_event.add_payload("sync_time", &format!("{sync_time:?}")); + dbg!(&analytics_event); + // send an analytics event if this is the first time this repo is being indexed - if is_index_job { + if repo_stats.reindex_count > 0 { let user = app.user().await; app.with_analytics(|hub| hub.track_repo(analytics_event, &user)); } @@ -580,12 +588,43 @@ struct WorkerStats { size: usize, // number of qdrant chunkc chunks: usize, + // number of dir-entries reindexed by this worker + reindex_count: usize, + // set to true if this is the first index of this reporef + is_first_index: bool, + + // set to true if the index was reset on startup + was_index_reset: bool, +} + +impl WorkerStats { + fn index_job_kind(&self) -> IndexJobKind { + if self.was_index_reset { + IndexJobKind::SchemaUpgrade { + reindex_file_count: self.reindex_count, + } + } else if self.is_first_index { + IndexJobKind::Index + } else { + IndexJobKind::PeriodicSync { + reindex_file_count: self.reindex_count, + } + } + } +} + +#[derive(serde::Serialize, Clone, Copy, PartialEq, Eq)] +enum IndexJobKind { + Index, + PeriodicSync { reindex_file_count: usize }, + SchemaUpgrade { reindex_file_count: usize }, } impl std::ops::AddAssign for WorkerStats { fn add_assign(&mut self, rhs: Self) { self.size += rhs.size; self.chunks += rhs.chunks; + self.reindex_count += rhs.reindex_count; } } @@ -686,6 +725,8 @@ impl RepoDir { let stats = WorkerStats { size: self.size(), chunks: 0, + reindex_count: 1, + ..Default::default() }; ( @@ -748,6 +789,7 @@ impl RepoFile { let indexed = explicitly_allowed.unwrap_or_else(|| self.should_index()); let mut stats = WorkerStats { size: self.size(), + reindex_count: 1, ..Default::default() }; diff --git a/server/bleep/src/lib.rs b/server/bleep/src/lib.rs index b96e9df1b6..83f99c913c 100644 --- a/server/bleep/src/lib.rs +++ b/server/bleep/src/lib.rs @@ -154,9 +154,10 @@ impl Application { .context("qdrant initialization failed")?; // Wipe existing dbs & caches if the schema has changed + let mut was_index_reset = false; if config.source.index_version_mismatch() { debug!("schema version mismatch, resetting state"); - + was_index_reset = true; Indexes::reset_databases(&config)?; debug!("tantivy indexes deleted"); @@ -173,7 +174,10 @@ impl Application { config.source.save_index_version()?; debug!("index version saved"); - let indexes = Indexes::new(&config, sql.clone()).await?.into(); + let indexes = Indexes::new(&config, sql.clone()) + .await? + .was_index_reset(dbg!(was_index_reset)) + .into(); debug!("indexes initialized"); // Enforce capabilies and features depending on environment From 146a864c0b6af063f8ba21fdd3293718634ad047 Mon Sep 17 00:00:00 2001 From: Akshay Date: Fri, 10 Nov 2023 16:14:54 +0000 Subject: [PATCH 3/4] clippy --- server/bleep/src/indexes/file.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/bleep/src/indexes/file.rs b/server/bleep/src/indexes/file.rs index 164af2ba29..3f2892265e 100644 --- a/server/bleep/src/indexes/file.rs +++ b/server/bleep/src/indexes/file.rs @@ -199,9 +199,11 @@ impl Indexable for File { // aggregate stats drop(worker_stats_tx); - let mut repo_stats = WorkerStats::default(); - repo_stats.is_first_index = is_first_index; - repo_stats.was_index_reset = app.indexes.was_index_reset; + let mut repo_stats = WorkerStats { + is_first_index, + was_index_reset: app.indexes.was_index_reset, + ..Default::default() + }; while let Ok(Some(stats)) = tokio::time::timeout(Duration::from_millis(10), worker_stats_rx.recv()).await { From 65b219051d32a79949f1a9b70c395e23261d4f2f Mon Sep 17 00:00:00 2001 From: Akshay Date: Tue, 14 Nov 2023 16:46:02 +0000 Subject: [PATCH 4/4] address review comments --- server/bleep/src/cache.rs | 24 +-- server/bleep/src/indexes.rs | 14 +- server/bleep/src/indexes/analytics.rs | 120 +++++++++++ server/bleep/src/indexes/file.rs | 290 +++++++++----------------- server/bleep/src/lib.rs | 3 +- 5 files changed, 238 insertions(+), 213 deletions(-) create mode 100644 server/bleep/src/indexes/analytics.rs diff --git a/server/bleep/src/cache.rs b/server/bleep/src/cache.rs index 517267dfb2..e74529d7ba 100644 --- a/server/bleep/src/cache.rs +++ b/server/bleep/src/cache.rs @@ -429,16 +429,12 @@ impl<'a> FileCache { }); match chunk_cache.commit().await { - Ok((new, updated, deleted)) => { + Ok(stats) => { info!( repo_name, - relative_path, new, updated, deleted, "Successful commit" + relative_path, stats.new, stats.updated, stats.deleted, "Successful commit" ); - InsertStats { - new, - updated, - deleted, - } + stats } Err(err) => { warn!(repo_name, relative_path, ?err, "Failed to upsert vectors"); @@ -597,16 +593,20 @@ impl<'a> ChunkCache<'a> { /// Since qdrant changes are pipelined on their end, data written /// here is not necessarily available for querying when the /// commit's completed. - pub async fn commit(self) -> anyhow::Result<(usize, usize, usize)> { + pub async fn commit(self) -> anyhow::Result { let mut tx = self.sql.begin().await?; - let update_size = self.commit_branch_updates(&mut tx).await?; - let delete_size = self.commit_deletes(&mut tx).await?; - let new_size = self.commit_inserts(&mut tx).await?; + let updated = self.commit_branch_updates(&mut tx).await?; + let deleted = self.commit_deletes(&mut tx).await?; + let new = self.commit_inserts(&mut tx).await?; tx.commit().await?; - Ok((new_size, update_size, delete_size)) + Ok(InsertStats { + new, + updated, + deleted, + }) } /// Insert new additions to sqlite diff --git a/server/bleep/src/indexes.rs b/server/bleep/src/indexes.rs index 14d3397d0e..2d9648cd69 100644 --- a/server/bleep/src/indexes.rs +++ b/server/bleep/src/indexes.rs @@ -10,6 +10,7 @@ use tantivy::{ DocAddress, Document, IndexReader, IndexWriter, Score, }; +mod analytics; pub mod doc; pub mod file; pub mod reader; @@ -84,7 +85,11 @@ pub struct Indexes { } impl Indexes { - pub async fn new(config: &Configuration, sql: crate::SqlDb) -> Result { + pub async fn new( + config: &Configuration, + sql: crate::SqlDb, + was_index_reset: bool, + ) -> Result { Ok(Self { repo: Indexer::create( Repo::new(), @@ -105,15 +110,10 @@ impl Indexes { config.max_threads, )?, write_mutex: Default::default(), - was_index_reset: false, + was_index_reset, }) } - pub fn was_index_reset(mut self, was_index_reset: bool) -> Self { - self.was_index_reset = was_index_reset; - self - } - pub fn reset_databases(config: &Configuration) -> Result<()> { // we don't support old schemas, and tantivy will hard // error if we try to open a db with a different schema. diff --git a/server/bleep/src/indexes/analytics.rs b/server/bleep/src/indexes/analytics.rs new file mode 100644 index 0000000000..7728e3d9a9 --- /dev/null +++ b/server/bleep/src/indexes/analytics.rs @@ -0,0 +1,120 @@ +use crate::{analytics::RepoEvent, repo::RepoRef}; +use tokio::{sync::mpsc, time::Instant}; + +#[derive(Default)] +pub struct WorkerStats { + // size in bytes + pub size: usize, + // number of qdrant chunkc + pub chunks: usize, + // number of dir-entries reindexed by this worker + pub reindex_count: usize, +} + +impl std::ops::AddAssign for WorkerStats { + fn add_assign(&mut self, rhs: Self) { + self.size += rhs.size; + self.chunks += rhs.chunks; + self.reindex_count += rhs.reindex_count; + } +} + +#[derive(serde::Serialize, Clone, Copy, PartialEq, Eq)] +enum IndexJobKind { + Index, + PeriodicSync { reindex_file_count: usize }, + SchemaUpgrade { reindex_file_count: usize }, +} + +// the main entrypoint into gathering analytics for an index job +pub struct StatsGatherer { + // reciever of stats from worker threads + stats_rx: mpsc::UnboundedReceiver, + // pass this along to each worker thread + stats_tx: mpsc::UnboundedSender, + // the reporef of the target index job + reporef: RepoRef, + // the moment this job began + start_time: Instant, + // set to true if this is the first index of this reporef + pub is_first_index: bool, + // set to true if the index was reset on startup + pub was_index_reset: bool, + // gather analytics events into this `event` field + pub event: RepoEvent, + // combine stats from each worker thread into `repo_stats` + pub repo_stats: WorkerStats, +} + +impl StatsGatherer { + pub fn for_repo(reporef: RepoRef) -> Self { + let (stats_tx, stats_rx) = mpsc::unbounded_channel(); + Self { + stats_rx, + stats_tx, + event: RepoEvent::new("index"), + reporef, + is_first_index: false, + was_index_reset: false, + start_time: Instant::now(), + repo_stats: WorkerStats::default(), + } + } + + pub fn sender(&self) -> mpsc::UnboundedSender { + self.stats_tx.clone() + } + + #[rustfmt::skip] + pub async fn finish(mut self) -> RepoEvent { + // aggregate stats + self.stats_rx.close(); + while let Some(stats) = self.stats_rx.recv().await { + self.repo_stats += stats; + } + + // determine the type of index job run + // + let job_kind = if self.was_index_reset { + IndexJobKind::SchemaUpgrade { + reindex_file_count: self.repo_stats.reindex_count, + } + } else if self.is_first_index { + IndexJobKind::Index + } else { + IndexJobKind::PeriodicSync { + reindex_file_count: self.repo_stats.reindex_count, + } + }; + + self.event.add_payload("reporef", &self.reporef.name()); + self.event.add_payload("provider", &self.reporef.backend()); + self.event.add_payload("index_job_kind", &job_kind); + self.event.add_payload("chunk_count", &self.repo_stats.chunks); + self.event.add_payload("bytes", &human_readable(self.repo_stats.size)); + self.event.add_payload("sync_time", &format!("{:?}", self.start_time.elapsed())); + self.event + } +} + +fn human_readable(size: usize) -> String { + let suffixes = ["B", "KB", "MB", "GB"]; + let s = suffixes + .iter() + .zip(0..10) + .rev() + .map(|(suf, exp)| (suf, size as f64 / (1024_f64.powi(exp)))) + .find(|(_, t)| t >= &1.0); + s.map(|(suffix, value)| format!("{value:.2}{suffix}")) + .unwrap_or_else(|| size.to_string()) +} + +#[cfg(test)] +mod test { + #[test] + fn human_readable() { + assert_eq!(super::human_readable(15), "15.00B"); + assert_eq!(super::human_readable(1024), "1.00KB"); + assert_eq!(super::human_readable(7616597515), "7.09GB"); + } +} diff --git a/server/bleep/src/indexes/file.rs b/server/bleep/src/indexes/file.rs index 3f2892265e..e6d37298b8 100644 --- a/server/bleep/src/indexes/file.rs +++ b/server/bleep/src/indexes/file.rs @@ -3,7 +3,6 @@ use std::{ panic::AssertUnwindSafe, path::{Path, PathBuf}, sync::atomic::{AtomicU64, Ordering}, - time::Duration, }; use anyhow::{bail, Result}; @@ -20,7 +19,10 @@ use tokenizers as _; use tokio::runtime::Handle; use tracing::{error, info, trace, warn}; -pub use super::schema::File; +pub use super::{ + analytics::{StatsGatherer, WorkerStats}, + schema::File, +}; #[cfg(feature = "debug")] use std::time::Instant; @@ -30,7 +32,6 @@ use super::{ DocumentRead, Indexable, Indexer, }; use crate::{ - analytics::RepoEvent, background::SyncHandle, cache::{CacheKeys, FileCache, FileCacheSnapshot}, intelligence::TreeSitterFile, @@ -48,6 +49,7 @@ struct Workload<'a> { repo_metadata: &'a RepoMetadata, relative_path: PathBuf, normalized_path: PathBuf, + stats_tx: tokio::sync::mpsc::UnboundedSender, } impl<'a> Workload<'a> { @@ -77,6 +79,12 @@ impl<'a> Workload<'a> { CacheKeys::new(semantic_hash, tantivy_hash) } + + fn transmit_stats(&self, stats: WorkerStats) { + if let Err(e) = self.stats_tx.send(stats) { + warn!("failed to transmit worker stats: {e}"); + } + } } #[async_trait] @@ -96,23 +104,20 @@ impl Indexable for File { ) -> Result<()> { let file_filter = FileFilter::compile(&repo.file_filter)?; let cache = file_cache.retrieve(reporef).await; - let is_first_index = cache.is_empty(); let repo_name = reporef.indexed_name(); let processed = &AtomicU64::new(0); + let mut stats_gatherer = StatsGatherer::for_repo(reporef.clone()); + stats_gatherer.is_first_index = cache.is_empty(); + stats_gatherer.was_index_reset = app.indexes.was_index_reset; - let mut analytics_event = RepoEvent::new("index") - .with_payload("repo_ref", &reporef.name()) - .with_payload("provider", &reporef.backend); - let (worker_stats_tx, mut worker_stats_rx) = - tokio::sync::mpsc::unbounded_channel::(); // collect stats for analytics - + let worker_stats_tx = stats_gatherer.sender(); let file_worker = |count: usize| { let cache = &cache; - let worker_stats_tx = worker_stats_tx.clone(); let callback = move |dir_entry: RepoDirEntry| { let completed = processed.fetch_add(1, Ordering::Relaxed); pipes.index_percent(((completed as f32 / count as f32) * 100f32) as u8); + let worker_stats_tx = worker_stats_tx.clone(); let entry_disk_path = dir_entry.path().to_owned(); let relative_path = { let entry_srcpath = PathBuf::from(&entry_disk_path); @@ -132,15 +137,13 @@ impl Indexable for File { normalized_path, repo_metadata, cache, + stats_tx: worker_stats_tx, }; trace!(entry_disk_path, "queueing entry"); - match self.worker(dir_entry, workload, writer) { - Ok(stats) => { - let _ = worker_stats_tx.send(stats); - } - Err(err) => warn!(%err, entry_disk_path, "indexing failed; skipping"), + if let Err(err) = self.worker(dir_entry, workload, writer) { + warn!(%err, entry_disk_path, "indexing failed; skipping"); } if let Err(err) = cache.parent().process_embedding_queue() { @@ -168,7 +171,7 @@ impl Indexable for File { repo.branch_filter.as_ref().map(Into::into), )?; let count = walker.len(); - analytics_event.add_payload("file_count", &count); + stats_gatherer.event.add_payload("file_count", &count); walker.for_each(pipes, file_worker(count)); } else { let branch = gix::open::Options::isolated() @@ -189,7 +192,7 @@ impl Indexable for File { let walker = FileWalker::index_directory(&repo.disk_path, branch); let count = walker.len(); - analytics_event.add_payload("file_count", &count); + stats_gatherer.event.add_payload("file_count", &count); walker.for_each(pipes, file_worker(count)); }; @@ -197,32 +200,12 @@ impl Indexable for File { bail!("cancelled"); } - // aggregate stats - drop(worker_stats_tx); - let mut repo_stats = WorkerStats { - is_first_index, - was_index_reset: app.indexes.was_index_reset, - ..Default::default() - }; - while let Ok(Some(stats)) = - tokio::time::timeout(Duration::from_millis(10), worker_stats_rx.recv()).await - { - repo_stats += stats; - } - analytics_event.add_payload("chunk_count", &repo_stats.chunks); - analytics_event.add_payload("bytes", &human_readable(repo_stats.size)); - analytics_event.add_payload("index_job_kind", &repo_stats.index_job_kind()); - - let sync_time = start.elapsed(); - info!(?repo.disk_path, "repo file indexing finished, took {:?}", sync_time); - analytics_event.add_payload("sync_time", &format!("{sync_time:?}")); - - dbg!(&analytics_event); + info!(?repo.disk_path, "repo file indexing finished, took {:?}", start.elapsed()); - // send an analytics event if this is the first time this repo is being indexed - if repo_stats.reindex_count > 0 { + if stats_gatherer.repo_stats.reindex_count > 0 { let user = app.user().await; - app.with_analytics(|hub| hub.track_repo(analytics_event, &user)); + let event = stats_gatherer.finish().await; + app.with_analytics(|hub| hub.track_repo(event, &user)); } file_cache @@ -584,52 +567,6 @@ impl Indexer { } } -#[derive(Default)] -struct WorkerStats { - // size in bytes - size: usize, - // number of qdrant chunkc - chunks: usize, - // number of dir-entries reindexed by this worker - reindex_count: usize, - // set to true if this is the first index of this reporef - is_first_index: bool, - - // set to true if the index was reset on startup - was_index_reset: bool, -} - -impl WorkerStats { - fn index_job_kind(&self) -> IndexJobKind { - if self.was_index_reset { - IndexJobKind::SchemaUpgrade { - reindex_file_count: self.reindex_count, - } - } else if self.is_first_index { - IndexJobKind::Index - } else { - IndexJobKind::PeriodicSync { - reindex_file_count: self.reindex_count, - } - } - } -} - -#[derive(serde::Serialize, Clone, Copy, PartialEq, Eq)] -enum IndexJobKind { - Index, - PeriodicSync { reindex_file_count: usize }, - SchemaUpgrade { reindex_file_count: usize }, -} - -impl std::ops::AddAssign for WorkerStats { - fn add_assign(&mut self, rhs: Self) { - self.size += rhs.size; - self.chunks += rhs.chunks; - self.reindex_count += rhs.reindex_count; - } -} - impl File { #[tracing::instrument(fields(repo=%workload.repo_ref, entry_disk_path=?dir_entry.path()), skip_all)] fn worker( @@ -637,7 +574,7 @@ impl File { dir_entry: RepoDirEntry, workload: Workload<'_>, writer: &IndexWriter, - ) -> Result { + ) -> Result<()> { #[cfg(feature = "debug")] let start = Instant::now(); trace!("processing file"); @@ -645,22 +582,19 @@ impl File { let cache_keys = workload.cache_keys(&dir_entry); let last_commit = workload.repo_metadata.last_commit_unix_secs.unwrap_or(0); - let stats = match dir_entry { + match dir_entry { _ if workload.cache.is_fresh(&cache_keys) => { info!("fresh; skipping"); - return Ok(WorkerStats::default()); } RepoDirEntry::Dir(dir) => { trace!("writing dir document"); - let (doc_stats, doc) = - dir.build_document(self, &workload, last_commit as u64, &cache_keys); + let doc = dir.build_document(self, &workload, last_commit as u64, &cache_keys); writer.add_document(doc)?; trace!("dir document written"); - doc_stats } RepoDirEntry::File(file) => { trace!("writing file document"); - let (doc_stats, doc) = file + let doc = file .build_document( self, &workload, @@ -671,9 +605,8 @@ impl File { .ok_or(anyhow::anyhow!("failed to build document"))?; writer.add_document(doc)?; trace!("file document written"); - doc_stats } - }; + } #[cfg(feature = "debug")] { @@ -698,7 +631,7 @@ impl File { } } - Ok(stats) + Ok(()) } } @@ -710,7 +643,7 @@ impl RepoDir { workload: &Workload<'_>, last_commit: u64, cache_keys: &CacheKeys, - ) -> (WorkerStats, tantivy::schema::Document) { + ) -> tantivy::schema::Document { let Workload { relative_path, repo_name, @@ -728,35 +661,32 @@ impl RepoDir { size: self.size(), chunks: 0, reindex_count: 1, - ..Default::default() }; - - ( - stats, - doc!( - schema.raw_repo_name => repo_name.as_bytes(), - schema.raw_relative_path => relative_path_str.as_bytes(), - schema.repo_disk_path => repo_disk_path.to_string_lossy().as_ref(), - schema.relative_path => relative_path_str, - schema.repo_ref => repo_ref.to_string(), - schema.repo_name => *repo_name, - schema.last_commit_unix_seconds => last_commit, - schema.branches => branches, - schema.is_directory => true, - schema.unique_hash => cache_keys.tantivy(), - - // always indicate dirs as indexed - schema.indexed => true, - - // nulls - schema.raw_content => Vec::::default(), - schema.content => String::default(), - schema.line_end_indices => Vec::::default(), - schema.lang => Vec::::default(), - schema.avg_line_length => f64::default(), - schema.symbol_locations => bincode::serialize(&SymbolLocations::default()).unwrap(), - schema.symbols => String::default(), - ), + workload.transmit_stats(stats); + + doc!( + schema.raw_repo_name => repo_name.as_bytes(), + schema.raw_relative_path => relative_path_str.as_bytes(), + schema.repo_disk_path => repo_disk_path.to_string_lossy().as_ref(), + schema.relative_path => relative_path_str, + schema.repo_ref => repo_ref.to_string(), + schema.repo_name => *repo_name, + schema.last_commit_unix_seconds => last_commit, + schema.branches => branches, + schema.is_directory => true, + schema.unique_hash => cache_keys.tantivy(), + + // always indicate dirs as indexed + schema.indexed => true, + + // nulls + schema.raw_content => Vec::::default(), + schema.content => String::default(), + schema.line_end_indices => Vec::::default(), + schema.lang => Vec::::default(), + schema.avg_line_length => f64::default(), + schema.symbol_locations => bincode::serialize(&SymbolLocations::default()).unwrap(), + schema.symbols => String::default(), ) } } @@ -770,7 +700,7 @@ impl RepoFile { cache_keys: &CacheKeys, last_commit: u64, file_cache: &FileCache, - ) -> Option<(WorkerStats, tantivy::schema::Document)> { + ) -> Option { let Workload { relative_path, repo_name, @@ -804,28 +734,25 @@ impl RepoFile { "" }); - return Some(( - stats, - doc!( - schema.raw_content => vec![], - schema.content => "", - schema.line_end_indices => vec![], - schema.avg_line_length => 0f64, - schema.symbol_locations => vec![], - schema.symbols => vec![], - schema.raw_repo_name => repo_name.as_bytes(), - schema.raw_relative_path => relative_path_str.as_bytes(), - schema.unique_hash => cache_keys.tantivy(), - schema.repo_disk_path => repo_disk_path.to_string_lossy().as_ref(), - schema.relative_path => relative_path_str, - schema.repo_ref => repo_ref.to_string(), - schema.repo_name => *repo_name, - schema.lang => lang_str.to_ascii_lowercase().as_bytes(), - schema.last_commit_unix_seconds => last_commit, - schema.branches => branches, - schema.is_directory => false, - schema.indexed => false, - ), + return Some(doc!( + schema.raw_content => vec![], + schema.content => "", + schema.line_end_indices => vec![], + schema.avg_line_length => 0f64, + schema.symbol_locations => vec![], + schema.symbols => vec![], + schema.raw_repo_name => repo_name.as_bytes(), + schema.raw_relative_path => relative_path_str.as_bytes(), + schema.unique_hash => cache_keys.tantivy(), + schema.repo_disk_path => repo_disk_path.to_string_lossy().as_ref(), + schema.relative_path => relative_path_str, + schema.repo_ref => repo_ref.to_string(), + schema.repo_name => *repo_name, + schema.lang => lang_str.to_ascii_lowercase().as_bytes(), + schema.last_commit_unix_seconds => last_commit, + schema.branches => branches, + schema.is_directory => false, + schema.indexed => false, )); } @@ -904,29 +831,27 @@ impl RepoFile { }); stats.chunks += insert_stats.new; - - Some(( - stats, - doc!( - schema.raw_content => buffer.as_bytes(), - schema.raw_repo_name => repo_name.as_bytes(), - schema.raw_relative_path => relative_path_str.as_bytes(), - schema.unique_hash => cache_keys.tantivy(), - schema.repo_disk_path => repo_disk_path.to_string_lossy().as_ref(), - schema.relative_path => relative_path_str, - schema.repo_ref => repo_ref.to_string(), - schema.repo_name => *repo_name, - schema.content => buffer, - schema.line_end_indices => line_end_indices, - schema.lang => lang_str.to_ascii_lowercase().as_bytes(), - schema.avg_line_length => lines_avg, - schema.last_commit_unix_seconds => last_commit, - schema.symbol_locations => bincode::serialize(&symbol_locations).unwrap(), - schema.symbols => symbols, - schema.branches => branches, - schema.is_directory => false, - schema.indexed => true, - ), + workload.transmit_stats(stats); + + Some(doc!( + schema.raw_content => buffer.as_bytes(), + schema.raw_repo_name => repo_name.as_bytes(), + schema.raw_relative_path => relative_path_str.as_bytes(), + schema.unique_hash => cache_keys.tantivy(), + schema.repo_disk_path => repo_disk_path.to_string_lossy().as_ref(), + schema.relative_path => relative_path_str, + schema.repo_ref => repo_ref.to_string(), + schema.repo_name => *repo_name, + schema.content => buffer, + schema.line_end_indices => line_end_indices, + schema.lang => lang_str.to_ascii_lowercase().as_bytes(), + schema.avg_line_length => lines_avg, + schema.last_commit_unix_seconds => last_commit, + schema.symbol_locations => bincode::serialize(&symbol_locations).unwrap(), + schema.symbols => symbols, + schema.branches => branches, + schema.is_directory => false, + schema.indexed => true, )) } } @@ -993,18 +918,6 @@ fn build_fuzzy_regex_filter(query_str: &str) -> Option { .ok() } -fn human_readable(size: usize) -> String { - let suffixes = ["B", "KB", "MB", "GB"]; - let s = suffixes - .iter() - .zip(0..10) - .rev() - .map(|(suf, exp)| (suf, size as f64 / (1024_f64.powi(exp)))) - .find(|(_, t)| t >= &1.0); - s.map(|(suffix, value)| format!("{value:.2}{suffix}")) - .unwrap_or_else(|| size.to_string()) -} - #[cfg(test)] mod tests { use super::*; @@ -1024,11 +937,4 @@ mod tests { // tests addition of character `n` assert!(filter.as_ref().unwrap().is_match("查询解析器在哪n")); } - - #[test] - fn human_readable() { - assert_eq!(super::human_readable(15), "15.00B"); - assert_eq!(super::human_readable(1024), "1.00KB"); - assert_eq!(super::human_readable(7616597515), "7.09GB"); - } } diff --git a/server/bleep/src/lib.rs b/server/bleep/src/lib.rs index 83f99c913c..069be546d8 100644 --- a/server/bleep/src/lib.rs +++ b/server/bleep/src/lib.rs @@ -174,9 +174,8 @@ impl Application { config.source.save_index_version()?; debug!("index version saved"); - let indexes = Indexes::new(&config, sql.clone()) + let indexes = Indexes::new(&config, sql.clone(), was_index_reset) .await? - .was_index_reset(dbg!(was_index_reset)) .into(); debug!("indexes initialized");