diff --git a/Cargo.toml b/Cargo.toml index 3be3da952c..31cb08ab6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,14 +74,7 @@ async-broadcast = "0.7.2" async-channel = "2.5.0" async-dropper = { version = "0.3.1", features = ["tokio", "simple"] } async-trait = "0.1.89" -async_zip = { version = "0.0.18", features = [ - "tokio", - "lzma", - "bzip2", - "xz", - "deflate", - "zstd", -] } +async_zip = { version = "0.0.18", features = ["tokio", "lzma", "bzip2", "xz", "deflate", "zstd"] } axum = { version = "0.8.8", features = ["macros"] } axum-server = { version = "0.8.0", features = ["tls-rustls"] } base64 = "0.22.1" @@ -93,11 +86,7 @@ bench-runner = { path = "core/bench/runner" } bit-set = "0.8.0" blake3 = "1.8.3" bon = "3.8.2" -byte-unit = { version = "5.2.0", default-features = false, features = [ - "serde", - "byte", - "std", -] } +byte-unit = { version = "5.2.0", default-features = false, features = ["serde", "byte", "std"] } bytemuck = { version = "1.25" } bytes = "1.11.1" charming = "0.6.0" @@ -233,10 +222,7 @@ rand_xoshiro = "0.8.0" rayon = "1.11.0" rcgen = "0.14.7" regex = "1.12.3" -reqwest = { version = "0.12.28", default-features = false, features = [ - "json", - "rustls-tls", -] } +reqwest = { version = "0.12.28", default-features = false, features = ["json", "rustls-tls"] } reqwest-middleware = { version = "0.4.2", features = ["json"] } reqwest-retry = "0.8.0" reqwest-tracing = "0.5.8" @@ -246,10 +232,7 @@ rmcp = "0.15.0" rmp-serde = "1.3.1" rolling-file = "0.2.0" rust-embed = "8.11.0" -rust-s3 = { version = "0.37.1", default-features = false, features = [ - "tokio-rustls-tls", - "tags", -] } +rust-s3 = { version = "0.37.1", default-features = false, features = ["tokio-rustls-tls", "tags"] } rustls = { version = "0.23.36", features = ["ring"] } rustls-pemfile = "2.2.0" send_wrapper = "0.6.0" @@ -283,11 +266,7 @@ tokio-rustls = "0.26.4" tokio-tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"] } tokio-util = { version = "0.7.18", features = ["compat"] } toml = "1.0.0" -tower-http = { version = "0.6.8", features = [ - "add-extension", - "cors", - "trace", -] } +tower-http = { version = "0.6.8", features = ["add-extension", "cors", "trace"] } tracing = "0.1.44" tracing-appender = "0.2.4" tracing-opentelemetry = "0.32.1" @@ -300,13 +279,7 @@ trait-variant = "0.1.2" tungstenite = "0.28.0" twox-hash = { version = "2.1.2", features = ["xxhash32"] } ulid = "1.2.1" -uuid = { version = "1.20.0", features = [ - "v4", - "v7", - "fast-rng", - "serde", - "zerocopy", -] } +uuid = { version = "1.20.0", features = ["v4", "v7", "fast-rng", "serde", "zerocopy"] } vergen-git2 = { version = "9.1.0", features = ["build", "cargo", "rustc", "si"] } walkdir = "2.5.0" wasm-bindgen = "0.2" diff --git a/core/common/src/types/message/message_boundaries.rs b/core/common/src/types/message/message_boundaries.rs new file mode 100644 index 0000000000..f1928e6c6f --- /dev/null +++ b/core/common/src/types/message/message_boundaries.rs @@ -0,0 +1,83 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#[derive(Clone, Copy)] +pub(crate) struct IggyMessageBoundaries<'a> { + indexes: &'a [u8], + messages_len: usize, + base_position: u32, + count: usize, +} + +impl<'a> IggyMessageBoundaries<'a> { + pub(crate) fn new( + indexes: &'a [u8], + messages_len: usize, + base_position: u32, + count: u32, + ) -> Option { + let count = count as usize; + let required_bytes = count.checked_mul(super::INDEX_SIZE)?; + if indexes.len() < required_bytes { + return None; + } + + Some(Self { + indexes, + messages_len, + base_position, + count, + }) + } + + pub(crate) fn count(&self) -> usize { + self.count + } + + pub(crate) fn boundaries(&self, index: usize) -> Option<(usize, usize)> { + if index >= self.count { + return None; + } + + let start = if index == 0 { + 0 + } else { + self.relative_position(index - 1)? + }; + + let end = if index + 1 == self.count { + self.messages_len + } else { + self.relative_position(index)? + }; + + if start > self.messages_len || end > self.messages_len || start > end { + return None; + } + + Some((start, end)) + } + + fn relative_position(&self, index: usize) -> Option { + let start = index.checked_mul(super::INDEX_SIZE)?.checked_add(4)?; + let end = start.checked_add(4)?; + let position_bytes = self.indexes.get(start..end)?; + let absolute_position = u32::from_le_bytes(position_bytes.try_into().ok()?); + Some(absolute_position.checked_sub(self.base_position)? as usize) + } +} diff --git a/core/common/src/types/message/message_view.rs b/core/common/src/types/message/message_view.rs index dd3fb6b1a3..d6d42e0b42 100644 --- a/core/common/src/types/message/message_view.rs +++ b/core/common/src/types/message/message_view.rs @@ -17,6 +17,7 @@ */ use super::HeaderValue; +use super::message_boundaries::IggyMessageBoundaries; use super::message_header::*; use crate::BytesSerializable; use crate::IggyByteSize; @@ -25,7 +26,8 @@ use crate::error::IggyError; use crate::utils::checksum; use crate::{HeaderKey, IggyMessageHeaderView}; use bytes::{Bytes, BytesMut}; -use std::{collections::HashMap, iter::Iterator}; +use std::collections::HashMap; +use std::num::NonZeroUsize; /// A immutable view of a message. #[derive(Debug)] @@ -160,6 +162,7 @@ impl Sizeable for IggyMessageView<'_> { pub struct IggyMessageViewIterator<'a> { buffer: &'a [u8], position: usize, + indexed_last: Option<(usize, NonZeroUsize)>, } impl<'a> IggyMessageViewIterator<'a> { @@ -167,8 +170,27 @@ impl<'a> IggyMessageViewIterator<'a> { Self { buffer, position: 0, + indexed_last: None, } } + + pub(crate) fn new_with_boundaries( + messages: &'a [u8], + indexes: &'a [u8], + base_position: u32, + count: u32, + ) -> Self { + let mut iter = Self::new(messages); + if let Some(boundaries) = + IggyMessageBoundaries::new(indexes, messages.len(), base_position, count) + && boundaries.count() > 0 + { + iter.indexed_last = boundaries + .boundaries(boundaries.count() - 1) + .and_then(|(start, end)| Some((start, NonZeroUsize::new(end)?))); + } + iter + } } impl<'a> Iterator for IggyMessageViewIterator<'a> { @@ -184,4 +206,68 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> { self.position += view.size(); Some(view) } + + fn last(self) -> Option { + if self.position == 0 + && let Some((start, end)) = self.indexed_last + && let Ok(view) = IggyMessageView::new(&self.buffer[start..end.get()]) + { + return Some(view); + } + + let mut last = None; + for item in self { + last = Some(item); + } + last + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::IggyMessage; + use bytes::Bytes; + + fn build_batch() -> crate::IggyMessagesBatch { + let messages = vec![ + IggyMessage::builder() + .payload(Bytes::from_static(b"one")) + .build() + .unwrap(), + IggyMessage::builder() + .payload(Bytes::from_static(b"two")) + .build() + .unwrap(), + IggyMessage::builder() + .payload(Bytes::from_static(b"three")) + .build() + .unwrap(), + ]; + crate::IggyMessagesBatch::from(messages) + } + + #[test] + fn should_return_tail_for_indexed_last_after_next() { + let batch = build_batch(); + let mut iter = IggyMessageViewIterator::new_with_boundaries( + batch.buffer(), + batch.indexes_slice(), + batch.indexes().base_position(), + batch.count(), + ); + + let first = iter.next().unwrap(); + assert_eq!(first.payload(), b"one"); + + let last = iter.last().unwrap(); + assert_eq!(last.payload(), b"three"); + } + + #[test] + fn should_return_last_message_for_raw_last() { + let batch = build_batch(); + let last = IggyMessageViewIterator::new(batch.buffer()).last().unwrap(); + assert_eq!(last.payload(), b"three"); + } } diff --git a/core/common/src/types/message/messages_batch.rs b/core/common/src/types/message/messages_batch.rs index aa6910b01f..d9b7b171f5 100644 --- a/core/common/src/types/message/messages_batch.rs +++ b/core/common/src/types/message/messages_batch.rs @@ -16,6 +16,7 @@ * under the License. */ +use super::message_boundaries::IggyMessageBoundaries; use crate::{ BytesSerializable, INDEX_SIZE, IggyByteSize, IggyIndexes, IggyMessage, IggyMessageView, IggyMessageViewIterator, MAX_PAYLOAD_SIZE, Sizeable, Validatable, error::IggyError, @@ -52,7 +53,12 @@ impl IggyMessagesBatch { /// Create iterator over messages pub fn iter(&self) -> IggyMessageViewIterator<'_> { - IggyMessageViewIterator::new(&self.messages) + IggyMessageViewIterator::new_with_boundaries( + &self.messages, + &self.indexes, + self.indexes.base_position(), + self.count, + ) } /// Get the number of messages @@ -120,47 +126,18 @@ impl IggyMessagesBatch { self.iter().last().map(|msg| msg.header().timestamp()) } - /// Calculates the start position of a message at the given index in the buffer - fn message_start_position(&self, index: usize) -> usize { - if index == 0 { - 0 - } else { - self.position_at(index as u32 - 1) as usize - self.indexes.base_position() as usize - } - } - - /// Calculates the end position of a message at the given index in the buffer - fn message_end_position(&self, index: usize) -> usize { - if index >= self.count as usize - 1 { - self.messages.len() - } else { - self.position_at(index as u32) as usize - self.indexes.base_position() as usize - } + fn boundaries(&self) -> Option> { + IggyMessageBoundaries::new( + &self.indexes, + self.messages.len(), + self.indexes.base_position(), + self.count, + ) } /// Gets the byte range for a message at the given index fn get_message_boundaries(&self, index: usize) -> Option<(usize, usize)> { - if index >= self.count as usize { - return None; - } - - let start = self.message_start_position(index); - let end = self.message_end_position(index); - - if start > self.messages.len() || end > self.messages.len() || start > end { - return None; - } - - Some((start, end)) - } - - /// Helper method to read a position (u32) from the byte array at the given index - fn position_at(&self, position_index: u32) -> u32 { - if let Some(index) = self.indexes.get(position_index) { - index.position() - } else { - 0 - } + self.boundaries()?.boundaries(index) } /// Get the message at the specified index. diff --git a/core/common/src/types/message/messages_batch_mut.rs b/core/common/src/types/message/messages_batch_mut.rs index 55e33bf27f..231ba00a21 100644 --- a/core/common/src/types/message/messages_batch_mut.rs +++ b/core/common/src/types/message/messages_batch_mut.rs @@ -17,6 +17,7 @@ */ use super::indexes_mut::IggyIndexesMut; +use super::message_boundaries::IggyMessageBoundaries; use super::message_view_mut::IggyMessageViewMutIterator; use crate::{ BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, IggyByteSize, IggyError, @@ -100,7 +101,12 @@ impl IggyMessagesBatchMut { /// Creates an iterator that yields immutable views of messages. pub fn iter(&self) -> IggyMessageViewIterator<'_> { - IggyMessageViewIterator::new(&self.messages) + IggyMessageViewIterator::new_with_boundaries( + &self.messages, + &self.indexes, + self.indexes.base_position(), + self.count(), + ) } /// Returns the number of messages in the batch. @@ -289,32 +295,23 @@ impl IggyMessagesBatchMut { self.indexes.get(index).map(|index| index.position()) } + fn boundaries(&self) -> Option> { + IggyMessageBoundaries::new( + &self.indexes, + self.messages.len(), + self.indexes.base_position(), + self.count(), + ) + } + /// Calculates the start position of a message at the given index in the buffer fn message_start_position(&self, index: usize) -> Option { - if index >= self.count() as usize { - return None; - } - - if index == 0 { - Some(0) - } else { - self.position_at(index as u32 - 1) - .map(|pos| (pos - self.indexes.base_position()) as usize) - } + self.get_message_boundaries(index).map(|(start, _)| start) } /// Calculates the end position of a message at the given index in the buffer fn message_end_position(&self, index: usize) -> Option { - if index >= self.count() as usize { - return None; - } - - if index == self.count() as usize - 1 { - Some(self.messages.len()) - } else { - self.position_at(index as u32) - .map(|pos| (pos - self.indexes.base_position()) as usize) - } + self.get_message_boundaries(index).map(|(_, end)| end) } /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to `count` messages @@ -464,8 +461,7 @@ impl IggyMessagesBatchMut { /// Gets the byte range for a message at the given index fn get_message_boundaries(&self, index: usize) -> Option<(usize, usize)> { - let start = self.message_start_position(index)?; - let end = self.message_end_position(index)?; + let (start, end) = self.boundaries()?.boundaries(index)?; if start > self.messages.len() || end > self.messages.len() diff --git a/core/common/src/types/message/mod.rs b/core/common/src/types/message/mod.rs index d3f85fb639..a20d69fc41 100644 --- a/core/common/src/types/message/mod.rs +++ b/core/common/src/types/message/mod.rs @@ -22,6 +22,7 @@ mod index; mod index_view; mod indexes; mod indexes_mut; +mod message_boundaries; mod message_header; mod message_header_view; mod message_header_view_mut;