From 36f0e8a7409412cbebdbd1764d3cbf150a32e0c4 Mon Sep 17 00:00:00 2001 From: cijiugechu Date: Mon, 2 Mar 2026 00:49:03 +0800 Subject: [PATCH 1/6] perf: make IggyMessagesBatch::last_offset O(1) --- core/common/src/types/message/messages_batch.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/common/src/types/message/messages_batch.rs b/core/common/src/types/message/messages_batch.rs index aa6910b01f..f0ac080f27 100644 --- a/core/common/src/types/message/messages_batch.rs +++ b/core/common/src/types/message/messages_batch.rs @@ -112,7 +112,14 @@ impl IggyMessagesBatch { /// Get offset of last message pub fn last_offset(&self) -> Option { - self.iter().last().map(|msg| msg.header().offset()) + if self.is_empty() { + return None; + } + + let last_index = self.count().saturating_sub(1) as usize; + self.get(last_index) + .map(|msg| msg.header().offset()) + .or_else(|| self.iter().last().map(|msg| msg.header().offset())) } /// Get timestamp of last message From bdee7b912349f4c934e3318acd89f3c86ec7a982 Mon Sep 17 00:00:00 2001 From: cijiugechu Date: Mon, 2 Mar 2026 01:49:03 +0800 Subject: [PATCH 2/6] refactor(common): remove redundant saturating_sub --- core/common/src/types/message/messages_batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/common/src/types/message/messages_batch.rs b/core/common/src/types/message/messages_batch.rs index f0ac080f27..6cb38c3cca 100644 --- a/core/common/src/types/message/messages_batch.rs +++ b/core/common/src/types/message/messages_batch.rs @@ -116,7 +116,7 @@ impl IggyMessagesBatch { return None; } - let last_index = self.count().saturating_sub(1) as usize; + let last_index = (self.count() - 1) as usize; self.get(last_index) .map(|msg| msg.header().offset()) .or_else(|| self.iter().last().map(|msg| msg.header().offset())) From fd2a4839fd3022c4a4d92d15afdd11640c4f37ac Mon Sep 17 00:00:00 2001 From: cijiugechu Date: Mon, 2 Mar 2026 22:16:20 +0800 Subject: [PATCH 3/6] Optimize batch last() lookup via indexed boundaries --- .../src/types/message/message_boundaries.rs | 83 ++++++++++++++ core/common/src/types/message/message_view.rs | 104 +++++++++++++++++- .../src/types/message/messages_batch.rs | 62 +++-------- .../src/types/message/messages_batch_mut.rs | 42 ++++--- core/common/src/types/message/mod.rs | 1 + 5 files changed, 222 insertions(+), 70 deletions(-) create mode 100644 core/common/src/types/message/message_boundaries.rs diff --git a/core/common/src/types/message/message_boundaries.rs b/core/common/src/types/message/message_boundaries.rs new file mode 100644 index 0000000000..f1928e6c6f --- /dev/null +++ b/core/common/src/types/message/message_boundaries.rs @@ -0,0 +1,83 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#[derive(Clone, Copy)] +pub(crate) struct IggyMessageBoundaries<'a> { + indexes: &'a [u8], + messages_len: usize, + base_position: u32, + count: usize, +} + +impl<'a> IggyMessageBoundaries<'a> { + pub(crate) fn new( + indexes: &'a [u8], + messages_len: usize, + base_position: u32, + count: u32, + ) -> Option { + let count = count as usize; + let required_bytes = count.checked_mul(super::INDEX_SIZE)?; + if indexes.len() < required_bytes { + return None; + } + + Some(Self { + indexes, + messages_len, + base_position, + count, + }) + } + + pub(crate) fn count(&self) -> usize { + self.count + } + + pub(crate) fn boundaries(&self, index: usize) -> Option<(usize, usize)> { + if index >= self.count { + return None; + } + + let start = if index == 0 { + 0 + } else { + self.relative_position(index - 1)? + }; + + let end = if index + 1 == self.count { + self.messages_len + } else { + self.relative_position(index)? + }; + + if start > self.messages_len || end > self.messages_len || start > end { + return None; + } + + Some((start, end)) + } + + fn relative_position(&self, index: usize) -> Option { + let start = index.checked_mul(super::INDEX_SIZE)?.checked_add(4)?; + let end = start.checked_add(4)?; + let position_bytes = self.indexes.get(start..end)?; + let absolute_position = u32::from_le_bytes(position_bytes.try_into().ok()?); + Some(absolute_position.checked_sub(self.base_position)? as usize) + } +} diff --git a/core/common/src/types/message/message_view.rs b/core/common/src/types/message/message_view.rs index dd3fb6b1a3..6537c3f460 100644 --- a/core/common/src/types/message/message_view.rs +++ b/core/common/src/types/message/message_view.rs @@ -17,6 +17,7 @@ */ use super::HeaderValue; +use super::message_boundaries::IggyMessageBoundaries; use super::message_header::*; use crate::BytesSerializable; use crate::IggyByteSize; @@ -25,7 +26,7 @@ use crate::error::IggyError; use crate::utils::checksum; use crate::{HeaderKey, IggyMessageHeaderView}; use bytes::{Bytes, BytesMut}; -use std::{collections::HashMap, iter::Iterator}; +use std::collections::HashMap; /// A immutable view of a message. #[derive(Debug)] @@ -160,6 +161,12 @@ impl Sizeable for IggyMessageView<'_> { pub struct IggyMessageViewIterator<'a> { buffer: &'a [u8], position: usize, + indexed: Option>, +} + +struct IggyMessageViewIteratorIndexed<'a> { + boundaries: IggyMessageBoundaries<'a>, + next_index: usize, } impl<'a> IggyMessageViewIterator<'a> { @@ -167,7 +174,26 @@ impl<'a> IggyMessageViewIterator<'a> { Self { buffer, position: 0, + indexed: None, + } + } + + pub(crate) fn new_with_boundaries( + messages: &'a [u8], + indexes: &'a [u8], + base_position: u32, + count: u32, + ) -> Self { + let mut iter = Self::new(messages); + if let Some(boundaries) = + IggyMessageBoundaries::new(indexes, messages.len(), base_position, count) + { + iter.indexed = Some(IggyMessageViewIteratorIndexed { + boundaries, + next_index: 0, + }); } + iter } } @@ -175,6 +201,17 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> { type Item = IggyMessageView<'a>; fn next(&mut self) -> Option { + if let Some(indexed) = &mut self.indexed { + if indexed.next_index >= indexed.boundaries.count() { + return None; + } + + let (start, end) = indexed.boundaries.boundaries(indexed.next_index)?; + indexed.next_index += 1; + self.position = end; + return IggyMessageView::new(&self.buffer[start..end]).ok(); + } + if self.position >= self.buffer.len() { return None; } @@ -184,4 +221,69 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> { self.position += view.size(); Some(view) } + + fn last(self) -> Option { + if let Some(indexed) = &self.indexed + && indexed.next_index < indexed.boundaries.count() + { + let last_index = indexed.boundaries.count() - 1; + let (start, end) = indexed.boundaries.boundaries(last_index)?; + return IggyMessageView::new(&self.buffer[start..end]).ok(); + } + + let mut last = None; + for item in self { + last = Some(item); + } + last + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::IggyMessage; + use bytes::Bytes; + + fn build_batch() -> crate::IggyMessagesBatch { + let messages = vec![ + IggyMessage::builder() + .payload(Bytes::from_static(b"one")) + .build() + .unwrap(), + IggyMessage::builder() + .payload(Bytes::from_static(b"two")) + .build() + .unwrap(), + IggyMessage::builder() + .payload(Bytes::from_static(b"three")) + .build() + .unwrap(), + ]; + crate::IggyMessagesBatch::from(messages) + } + + #[test] + fn should_return_tail_for_indexed_last_after_next() { + let batch = build_batch(); + let mut iter = IggyMessageViewIterator::new_with_boundaries( + batch.buffer(), + batch.indexes_slice(), + batch.indexes().base_position(), + batch.count(), + ); + + let first = iter.next().unwrap(); + assert_eq!(first.payload(), b"one"); + + let last = iter.last().unwrap(); + assert_eq!(last.payload(), b"three"); + } + + #[test] + fn should_return_last_message_for_raw_last() { + let batch = build_batch(); + let last = IggyMessageViewIterator::new(batch.buffer()).last().unwrap(); + assert_eq!(last.payload(), b"three"); + } } diff --git a/core/common/src/types/message/messages_batch.rs b/core/common/src/types/message/messages_batch.rs index 6cb38c3cca..d9b7b171f5 100644 --- a/core/common/src/types/message/messages_batch.rs +++ b/core/common/src/types/message/messages_batch.rs @@ -16,6 +16,7 @@ * under the License. */ +use super::message_boundaries::IggyMessageBoundaries; use crate::{ BytesSerializable, INDEX_SIZE, IggyByteSize, IggyIndexes, IggyMessage, IggyMessageView, IggyMessageViewIterator, MAX_PAYLOAD_SIZE, Sizeable, Validatable, error::IggyError, @@ -52,7 +53,12 @@ impl IggyMessagesBatch { /// Create iterator over messages pub fn iter(&self) -> IggyMessageViewIterator<'_> { - IggyMessageViewIterator::new(&self.messages) + IggyMessageViewIterator::new_with_boundaries( + &self.messages, + &self.indexes, + self.indexes.base_position(), + self.count, + ) } /// Get the number of messages @@ -112,14 +118,7 @@ impl IggyMessagesBatch { /// Get offset of last message pub fn last_offset(&self) -> Option { - if self.is_empty() { - return None; - } - - let last_index = (self.count() - 1) as usize; - self.get(last_index) - .map(|msg| msg.header().offset()) - .or_else(|| self.iter().last().map(|msg| msg.header().offset())) + self.iter().last().map(|msg| msg.header().offset()) } /// Get timestamp of last message @@ -127,47 +126,18 @@ impl IggyMessagesBatch { self.iter().last().map(|msg| msg.header().timestamp()) } - /// Calculates the start position of a message at the given index in the buffer - fn message_start_position(&self, index: usize) -> usize { - if index == 0 { - 0 - } else { - self.position_at(index as u32 - 1) as usize - self.indexes.base_position() as usize - } - } - - /// Calculates the end position of a message at the given index in the buffer - fn message_end_position(&self, index: usize) -> usize { - if index >= self.count as usize - 1 { - self.messages.len() - } else { - self.position_at(index as u32) as usize - self.indexes.base_position() as usize - } + fn boundaries(&self) -> Option> { + IggyMessageBoundaries::new( + &self.indexes, + self.messages.len(), + self.indexes.base_position(), + self.count, + ) } /// Gets the byte range for a message at the given index fn get_message_boundaries(&self, index: usize) -> Option<(usize, usize)> { - if index >= self.count as usize { - return None; - } - - let start = self.message_start_position(index); - let end = self.message_end_position(index); - - if start > self.messages.len() || end > self.messages.len() || start > end { - return None; - } - - Some((start, end)) - } - - /// Helper method to read a position (u32) from the byte array at the given index - fn position_at(&self, position_index: u32) -> u32 { - if let Some(index) = self.indexes.get(position_index) { - index.position() - } else { - 0 - } + self.boundaries()?.boundaries(index) } /// Get the message at the specified index. diff --git a/core/common/src/types/message/messages_batch_mut.rs b/core/common/src/types/message/messages_batch_mut.rs index 55e33bf27f..231ba00a21 100644 --- a/core/common/src/types/message/messages_batch_mut.rs +++ b/core/common/src/types/message/messages_batch_mut.rs @@ -17,6 +17,7 @@ */ use super::indexes_mut::IggyIndexesMut; +use super::message_boundaries::IggyMessageBoundaries; use super::message_view_mut::IggyMessageViewMutIterator; use crate::{ BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, IggyByteSize, IggyError, @@ -100,7 +101,12 @@ impl IggyMessagesBatchMut { /// Creates an iterator that yields immutable views of messages. pub fn iter(&self) -> IggyMessageViewIterator<'_> { - IggyMessageViewIterator::new(&self.messages) + IggyMessageViewIterator::new_with_boundaries( + &self.messages, + &self.indexes, + self.indexes.base_position(), + self.count(), + ) } /// Returns the number of messages in the batch. @@ -289,32 +295,23 @@ impl IggyMessagesBatchMut { self.indexes.get(index).map(|index| index.position()) } + fn boundaries(&self) -> Option> { + IggyMessageBoundaries::new( + &self.indexes, + self.messages.len(), + self.indexes.base_position(), + self.count(), + ) + } + /// Calculates the start position of a message at the given index in the buffer fn message_start_position(&self, index: usize) -> Option { - if index >= self.count() as usize { - return None; - } - - if index == 0 { - Some(0) - } else { - self.position_at(index as u32 - 1) - .map(|pos| (pos - self.indexes.base_position()) as usize) - } + self.get_message_boundaries(index).map(|(start, _)| start) } /// Calculates the end position of a message at the given index in the buffer fn message_end_position(&self, index: usize) -> Option { - if index >= self.count() as usize { - return None; - } - - if index == self.count() as usize - 1 { - Some(self.messages.len()) - } else { - self.position_at(index as u32) - .map(|pos| (pos - self.indexes.base_position()) as usize) - } + self.get_message_boundaries(index).map(|(_, end)| end) } /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to `count` messages @@ -464,8 +461,7 @@ impl IggyMessagesBatchMut { /// Gets the byte range for a message at the given index fn get_message_boundaries(&self, index: usize) -> Option<(usize, usize)> { - let start = self.message_start_position(index)?; - let end = self.message_end_position(index)?; + let (start, end) = self.boundaries()?.boundaries(index)?; if start > self.messages.len() || end > self.messages.len() diff --git a/core/common/src/types/message/mod.rs b/core/common/src/types/message/mod.rs index d3f85fb639..a20d69fc41 100644 --- a/core/common/src/types/message/mod.rs +++ b/core/common/src/types/message/mod.rs @@ -22,6 +22,7 @@ mod index; mod index_view; mod indexes; mod indexes_mut; +mod message_boundaries; mod message_header; mod message_header_view; mod message_header_view_mut; From 4bf9fb1117142087ce7bb4e7d612e1df818fd378 Mon Sep 17 00:00:00 2001 From: cijiugechu Date: Wed, 4 Mar 2026 22:59:37 +0800 Subject: [PATCH 4/6] fix: preserve iterator semantics with O(1) indexed last lookup --- core/common/src/types/message/message_view.rs | 38 ++++++------------- 1 file changed, 11 insertions(+), 27 deletions(-) diff --git a/core/common/src/types/message/message_view.rs b/core/common/src/types/message/message_view.rs index 6537c3f460..d6d42e0b42 100644 --- a/core/common/src/types/message/message_view.rs +++ b/core/common/src/types/message/message_view.rs @@ -27,6 +27,7 @@ use crate::utils::checksum; use crate::{HeaderKey, IggyMessageHeaderView}; use bytes::{Bytes, BytesMut}; use std::collections::HashMap; +use std::num::NonZeroUsize; /// A immutable view of a message. #[derive(Debug)] @@ -161,12 +162,7 @@ impl Sizeable for IggyMessageView<'_> { pub struct IggyMessageViewIterator<'a> { buffer: &'a [u8], position: usize, - indexed: Option>, -} - -struct IggyMessageViewIteratorIndexed<'a> { - boundaries: IggyMessageBoundaries<'a>, - next_index: usize, + indexed_last: Option<(usize, NonZeroUsize)>, } impl<'a> IggyMessageViewIterator<'a> { @@ -174,7 +170,7 @@ impl<'a> IggyMessageViewIterator<'a> { Self { buffer, position: 0, - indexed: None, + indexed_last: None, } } @@ -187,11 +183,11 @@ impl<'a> IggyMessageViewIterator<'a> { let mut iter = Self::new(messages); if let Some(boundaries) = IggyMessageBoundaries::new(indexes, messages.len(), base_position, count) + && boundaries.count() > 0 { - iter.indexed = Some(IggyMessageViewIteratorIndexed { - boundaries, - next_index: 0, - }); + iter.indexed_last = boundaries + .boundaries(boundaries.count() - 1) + .and_then(|(start, end)| Some((start, NonZeroUsize::new(end)?))); } iter } @@ -201,17 +197,6 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> { type Item = IggyMessageView<'a>; fn next(&mut self) -> Option { - if let Some(indexed) = &mut self.indexed { - if indexed.next_index >= indexed.boundaries.count() { - return None; - } - - let (start, end) = indexed.boundaries.boundaries(indexed.next_index)?; - indexed.next_index += 1; - self.position = end; - return IggyMessageView::new(&self.buffer[start..end]).ok(); - } - if self.position >= self.buffer.len() { return None; } @@ -223,12 +208,11 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> { } fn last(self) -> Option { - if let Some(indexed) = &self.indexed - && indexed.next_index < indexed.boundaries.count() + if self.position == 0 + && let Some((start, end)) = self.indexed_last + && let Ok(view) = IggyMessageView::new(&self.buffer[start..end.get()]) { - let last_index = indexed.boundaries.count() - 1; - let (start, end) = indexed.boundaries.boundaries(last_index)?; - return IggyMessageView::new(&self.buffer[start..end]).ok(); + return Some(view); } let mut last = None; From 2a7b205034518bbce2dbfb2987fada028bc3a33b Mon Sep 17 00:00:00 2001 From: cijiugechu Date: Wed, 4 Mar 2026 23:22:12 +0800 Subject: [PATCH 5/6] chore: apply cargo sort formatting to manifests --- Cargo.toml | 94 +++++-------------------------------- core/ai/mcp/Cargo.toml | 6 +-- core/integration/Cargo.toml | 7 +-- 3 files changed, 14 insertions(+), 93 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index dc2bb66418..77a16b4241 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,14 +74,7 @@ async-broadcast = "0.7.2" async-channel = "2.5.0" async-dropper = { version = "0.3.1", features = ["tokio", "simple"] } async-trait = "0.1.89" -async_zip = { version = "0.0.18", features = [ - "tokio", - "lzma", - "bzip2", - "xz", - "deflate", - "zstd", -] } +async_zip = { version = "0.0.18", features = ["tokio", "lzma", "bzip2", "xz", "deflate", "zstd"] } axum = { version = "0.8.8", features = ["macros"] } axum-server = { version = "0.8.0", features = ["tls-rustls"] } base64 = "0.22.1" @@ -93,11 +86,7 @@ bench-runner = { path = "core/bench/runner" } bit-set = "0.8.0" blake3 = "1.8.3" bon = "3.8.2" -byte-unit = { version = "5.2.0", default-features = false, features = [ - "serde", - "byte", - "std", -] } +byte-unit = { version = "5.2.0", default-features = false, features = ["serde", "byte", "std"] } bytemuck = { version = "1.25" } bytes = "1.11.1" charming = "0.6.0" @@ -107,19 +96,7 @@ clap_complete = "4.5.66" clock = { path = "core/clock" } colored = "3.1.1" comfy-table = "7.2.2" -compio = { version = "0.18.0", features = [ - "runtime", - "macros", - "io-uring", - "time", - "rustls", - "ring", - "net", - "quic", - "tls", - "ws", - "fs", -] } +compio = { version = "0.18.0", features = ["runtime", "macros", "io-uring", "time", "rustls", "ring", "net", "quic", "tls", "ws", "fs"] } # Pin compio-driver >= 0.11.2 to fix musl compilation (compio-rs/compio#668) compio-driver = "0.11.2" configs = { path = "core/configs", version = "0.1.0" } @@ -195,22 +172,9 @@ octocrab = "0.49.5" once_cell = "1.21.3" opentelemetry = { version = "0.31.0", features = ["trace", "logs"] } opentelemetry-appender-tracing = { version = "0.31.1", features = ["log"] } -opentelemetry-otlp = { version = "0.31.0", features = [ - "logs", - "trace", - "grpc-tonic", - "http", - "http-proto", - "reqwest-client", -] } +opentelemetry-otlp = { version = "0.31.0", features = ["logs", "trace", "grpc-tonic", "http", "http-proto", "reqwest-client"] } opentelemetry-semantic-conventions = "0.31.0" -opentelemetry_sdk = { version = "0.31.0", features = [ - "logs", - "trace", - "experimental_async_runtime", - "experimental_logs_batch_log_processor_with_async_runtime", - "experimental_trace_batch_span_processor_with_async_runtime", -] } +opentelemetry_sdk = { version = "0.31.0", features = ["logs", "trace", "experimental_async_runtime", "experimental_logs_batch_log_processor_with_async_runtime", "experimental_trace_batch_span_processor_with_async_runtime"] } papaya = "0.2.3" parquet = "57.3.0" partitions = { path = "core/partitions" } @@ -232,10 +196,7 @@ rand_xoshiro = "0.8.0" rayon = "1.11.0" rcgen = "0.14.7" regex = "1.12.3" -reqwest = { version = "0.12.28", default-features = false, features = [ - "json", - "rustls-tls", -] } +reqwest = { version = "0.12.28", default-features = false, features = ["json", "rustls-tls"] } reqwest-middleware = { version = "0.4.2", features = ["json"] } reqwest-retry = "0.8.0" reqwest-tracing = "0.5.8" @@ -245,10 +206,7 @@ rmcp = "0.15.0" rmp-serde = "1.3.1" rolling-file = "0.2.0" rust-embed = "8.11.0" -rust-s3 = { version = "0.37.1", default-features = false, features = [ - "tokio-rustls-tls", - "tags", -] } +rust-s3 = { version = "0.37.1", default-features = false, features = ["tokio-rustls-tls", "tags"] } rustls = { version = "0.23.36", features = ["ring"] } rustls-pemfile = "2.2.0" send_wrapper = "0.6.0" @@ -261,13 +219,7 @@ server = { path = "core/server" } simd-json = { version = "0.17.0", features = ["serde_impl"] } slab = "0.4.12" socket2 = "0.6.2" -sqlx = { version = "0.8.6", features = [ - "runtime-tokio-rustls", - "postgres", - "chrono", - "uuid", - "json", -] } +sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "chrono", "uuid", "json"] } static-toml = "1.3.0" strum = { version = "0.27.2", features = ["derive"] } strum_macros = "0.27.2" @@ -282,42 +234,20 @@ tokio-rustls = "0.26.4" tokio-tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"] } tokio-util = { version = "0.7.18", features = ["compat"] } toml = "1.0.0" -tower-http = { version = "0.6.8", features = [ - "add-extension", - "cors", - "trace", -] } +tower-http = { version = "0.6.8", features = ["add-extension", "cors", "trace"] } tracing = "0.1.44" tracing-appender = "0.2.4" tracing-opentelemetry = "0.32.1" -tracing-subscriber = { version = "0.3.22", default-features = false, features = [ - "fmt", - "env-filter", - "ansi", -] } +tracing-subscriber = { version = "0.3.22", default-features = false, features = ["fmt", "env-filter", "ansi"] } trait-variant = "0.1.2" tungstenite = "0.28.0" twox-hash = { version = "2.1.2", features = ["xxhash32"] } ulid = "1.2.1" -uuid = { version = "1.20.0", features = [ - "v4", - "v7", - "fast-rng", - "serde", - "zerocopy", -] } +uuid = { version = "1.20.0", features = ["v4", "v7", "fast-rng", "serde", "zerocopy"] } vergen-git2 = { version = "9.1.0", features = ["build", "cargo", "rustc", "si"] } walkdir = "2.5.0" wasm-bindgen = "0.2" -web-sys = { version = "0.3", features = [ - "Window", - "Location", - "HtmlSelectElement", - "Clipboard", - "Navigator", - "ResizeObserver", - "ResizeObserverEntry", -] } +web-sys = { version = "0.3", features = ["Window", "Location", "HtmlSelectElement", "Clipboard", "Navigator", "ResizeObserver", "ResizeObserverEntry"] } webpki-roots = "1.0.6" yew = { version = "0.22", features = ["csr"] } yew-router = "0.19" diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml index 130d423fa0..ef71e93ff0 100644 --- a/core/ai/mcp/Cargo.toml +++ b/core/ai/mcp/Cargo.toml @@ -42,11 +42,7 @@ opentelemetry-otlp = { workspace = true } opentelemetry-semantic-conventions = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } reqwest = { workspace = true } -rmcp = { workspace = true, features = [ - "server", - "transport-io", - "transport-streamable-http-server", -] } +rmcp = { workspace = true, features = ["server", "transport-io", "transport-streamable-http-server"] } serde = { workspace = true } serde_json = { workspace = true } socket2 = "0.6" diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index 13afb04dcf..c76b6fe7f2 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -54,12 +54,7 @@ rcgen = { workspace = true } reqwest = { workspace = true } reqwest-middleware = { workspace = true } reqwest-retry = { workspace = true } -rmcp = { workspace = true, features = [ - "client", - "reqwest", - "transport-streamable-http-client", - "transport-streamable-http-client-reqwest", -] } +rmcp = { workspace = true, features = ["client", "reqwest", "transport-streamable-http-client", "transport-streamable-http-client-reqwest"] } serde = { workspace = true } serde_json = { workspace = true } serial_test = { workspace = true } From 9fa6d330a95ab260eecd5a6052e9115c2f3d498b Mon Sep 17 00:00:00 2001 From: cijiugechu Date: Wed, 4 Mar 2026 23:46:16 +0800 Subject: [PATCH 6/6] chore: format manifests with taplo --- Cargo.toml | 55 +++++++++++++++++++++++++++++++++---- core/ai/mcp/Cargo.toml | 6 +++- core/integration/Cargo.toml | 7 ++++- 3 files changed, 60 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 77a16b4241..9ab2e1db7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,7 +96,19 @@ clap_complete = "4.5.66" clock = { path = "core/clock" } colored = "3.1.1" comfy-table = "7.2.2" -compio = { version = "0.18.0", features = ["runtime", "macros", "io-uring", "time", "rustls", "ring", "net", "quic", "tls", "ws", "fs"] } +compio = { version = "0.18.0", features = [ + "runtime", + "macros", + "io-uring", + "time", + "rustls", + "ring", + "net", + "quic", + "tls", + "ws", + "fs", +] } # Pin compio-driver >= 0.11.2 to fix musl compilation (compio-rs/compio#668) compio-driver = "0.11.2" configs = { path = "core/configs", version = "0.1.0" } @@ -172,9 +184,22 @@ octocrab = "0.49.5" once_cell = "1.21.3" opentelemetry = { version = "0.31.0", features = ["trace", "logs"] } opentelemetry-appender-tracing = { version = "0.31.1", features = ["log"] } -opentelemetry-otlp = { version = "0.31.0", features = ["logs", "trace", "grpc-tonic", "http", "http-proto", "reqwest-client"] } +opentelemetry-otlp = { version = "0.31.0", features = [ + "logs", + "trace", + "grpc-tonic", + "http", + "http-proto", + "reqwest-client", +] } opentelemetry-semantic-conventions = "0.31.0" -opentelemetry_sdk = { version = "0.31.0", features = ["logs", "trace", "experimental_async_runtime", "experimental_logs_batch_log_processor_with_async_runtime", "experimental_trace_batch_span_processor_with_async_runtime"] } +opentelemetry_sdk = { version = "0.31.0", features = [ + "logs", + "trace", + "experimental_async_runtime", + "experimental_logs_batch_log_processor_with_async_runtime", + "experimental_trace_batch_span_processor_with_async_runtime", +] } papaya = "0.2.3" parquet = "57.3.0" partitions = { path = "core/partitions" } @@ -219,7 +244,13 @@ server = { path = "core/server" } simd-json = { version = "0.17.0", features = ["serde_impl"] } slab = "0.4.12" socket2 = "0.6.2" -sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "chrono", "uuid", "json"] } +sqlx = { version = "0.8.6", features = [ + "runtime-tokio-rustls", + "postgres", + "chrono", + "uuid", + "json", +] } static-toml = "1.3.0" strum = { version = "0.27.2", features = ["derive"] } strum_macros = "0.27.2" @@ -238,7 +269,11 @@ tower-http = { version = "0.6.8", features = ["add-extension", "cors", "trace"] tracing = "0.1.44" tracing-appender = "0.2.4" tracing-opentelemetry = "0.32.1" -tracing-subscriber = { version = "0.3.22", default-features = false, features = ["fmt", "env-filter", "ansi"] } +tracing-subscriber = { version = "0.3.22", default-features = false, features = [ + "fmt", + "env-filter", + "ansi", +] } trait-variant = "0.1.2" tungstenite = "0.28.0" twox-hash = { version = "2.1.2", features = ["xxhash32"] } @@ -247,7 +282,15 @@ uuid = { version = "1.20.0", features = ["v4", "v7", "fast-rng", "serde", "zeroc vergen-git2 = { version = "9.1.0", features = ["build", "cargo", "rustc", "si"] } walkdir = "2.5.0" wasm-bindgen = "0.2" -web-sys = { version = "0.3", features = ["Window", "Location", "HtmlSelectElement", "Clipboard", "Navigator", "ResizeObserver", "ResizeObserverEntry"] } +web-sys = { version = "0.3", features = [ + "Window", + "Location", + "HtmlSelectElement", + "Clipboard", + "Navigator", + "ResizeObserver", + "ResizeObserverEntry", +] } webpki-roots = "1.0.6" yew = { version = "0.22", features = ["csr"] } yew-router = "0.19" diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml index ef71e93ff0..130d423fa0 100644 --- a/core/ai/mcp/Cargo.toml +++ b/core/ai/mcp/Cargo.toml @@ -42,7 +42,11 @@ opentelemetry-otlp = { workspace = true } opentelemetry-semantic-conventions = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } reqwest = { workspace = true } -rmcp = { workspace = true, features = ["server", "transport-io", "transport-streamable-http-server"] } +rmcp = { workspace = true, features = [ + "server", + "transport-io", + "transport-streamable-http-server", +] } serde = { workspace = true } serde_json = { workspace = true } socket2 = "0.6" diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index c76b6fe7f2..13afb04dcf 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -54,7 +54,12 @@ rcgen = { workspace = true } reqwest = { workspace = true } reqwest-middleware = { workspace = true } reqwest-retry = { workspace = true } -rmcp = { workspace = true, features = ["client", "reqwest", "transport-streamable-http-client", "transport-streamable-http-client-reqwest"] } +rmcp = { workspace = true, features = [ + "client", + "reqwest", + "transport-streamable-http-client", + "transport-streamable-http-client-reqwest", +] } serde = { workspace = true } serde_json = { workspace = true } serial_test = { workspace = true }