From 6fa68f20b4b85507baa6c6712d50c96e7f77f400 Mon Sep 17 00:00:00 2001 From: shin Date: Tue, 3 Mar 2026 02:42:38 +0900 Subject: [PATCH 1/5] feat(common): add protocol feature flags to iggy_common Add tcp, quic, websocket feature flags to iggy_common crate. Guard SenderKind variants, constructors, and dispatch macro with #[cfg(feature)] attributes. Make tungstenite optional, gated behind websocket feature. Signed-off-by: shin --- core/common/Cargo.toml | 8 ++++- core/common/src/lib.rs | 10 ++++-- core/common/src/sender/mod.rs | 35 ++++++++++++++++++- .../websocket_client_config.rs | 33 ++++++++++++----- 4 files changed, 72 insertions(+), 14 deletions(-) diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index aab4f87465..5dd5bbfcf6 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -27,6 +27,12 @@ documentation = "https://iggy.apache.org/docs" repository = "https://github.com/apache/iggy" readme = "../../README.md" +[features] +default = ["tcp", "quic", "websocket"] +tcp = [] +quic = [] +websocket = ["dep:tungstenite"] + [dependencies] aes-gcm = { workspace = true } ahash = { workspace = true } @@ -60,7 +66,7 @@ strum = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } -tungstenite = { workspace = true } +tungstenite = { workspace = true, optional = true } twox-hash = { workspace = true } ulid = { workspace = true } uuid = { workspace = true } diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index 6e8b4dcf76..15f81894ec 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -49,9 +49,13 @@ pub use commands::system::*; pub use commands::topics::*; pub use commands::users::*; pub use deduplication::MessageDeduplicator; -pub use sender::{ - QuicSender, Sender, SenderKind, TcpSender, TcpTlsSender, WebSocketSender, WebSocketTlsSender, -}; +#[cfg(feature = "quic")] +pub use sender::QuicSender; +pub use sender::{Sender, SenderKind}; +#[cfg(feature = "tcp")] +pub use sender::{TcpSender, TcpTlsSender}; +#[cfg(feature = "websocket")] +pub use sender::{WebSocketSender, WebSocketTlsSender}; pub use traits::bytes_serializable::BytesSerializable; pub use traits::partitioner::Partitioner; pub use traits::sizeable::Sizeable; diff --git a/core/common/src/sender/mod.rs b/core/common/src/sender/mod.rs index 1da8eb0de9..bcbabb3244 100644 --- a/core/common/src/sender/mod.rs +++ b/core/common/src/sender/mod.rs @@ -16,16 +16,31 @@ * under the License. */ +#[cfg(not(any(feature = "tcp", feature = "quic", feature = "websocket")))] +compile_error!( + "At least one protocol feature (tcp, quic, websocket) must be enabled for iggy_common" +); + +#[cfg(feature = "quic")] mod quic_sender; +#[cfg(feature = "tcp")] mod tcp_sender; +#[cfg(feature = "tcp")] mod tcp_tls_sender; +#[cfg(feature = "websocket")] mod websocket_sender; +#[cfg(feature = "websocket")] mod websocket_tls_sender; +#[cfg(feature = "quic")] pub use quic_sender::QuicSender; +#[cfg(feature = "tcp")] pub use tcp_sender::TcpSender; +#[cfg(feature = "tcp")] pub use tcp_tls_sender::TcpTlsSender; +#[cfg(feature = "websocket")] pub use websocket_sender::WebSocketSender; +#[cfg(feature = "websocket")] pub use websocket_tls_sender::WebSocketTlsSender; use crate::IggyError; @@ -33,8 +48,11 @@ use crate::alloc::buffer::PooledBuffer; use compio::BufResult; use compio::buf::IoBufMut; use compio::io::{AsyncReadExt, AsyncWriteExt}; +#[cfg(any(feature = "tcp", feature = "websocket"))] use compio::net::TcpStream; +#[cfg(feature = "quic")] use compio::quic::{RecvStream, SendStream}; +#[cfg(feature = "tcp")] use compio::tls::TlsStream; use std::future::Future; #[cfg(unix)] @@ -58,10 +76,15 @@ macro_rules! forward_async_methods { $(<$($generic $(: $bound)?),+>)? (&mut self, $( $arg: $arg_ty ),* ) -> $ret { match self { + #[cfg(feature = "tcp")] Self::Tcp(d) => d.$method_name$(::<$($generic),+>)?($( $arg ),*).await, + #[cfg(feature = "tcp")] Self::TcpTls(s) => s.$method_name$(::<$($generic),+>)?($( $arg ),*).await, + #[cfg(feature = "quic")] Self::Quic(s) => s.$method_name$(::<$($generic),+>)?($( $arg ),*).await, + #[cfg(feature = "websocket")] Self::WebSocket(s) => s.$method_name$(::<$($generic),+>)?($( $arg ),*).await, + #[cfg(feature = "websocket")] Self::WebSocketTls(s) => s.$method_name$(::<$($generic),+>)?($( $arg ),*).await, } } @@ -88,24 +111,32 @@ pub trait Sender { #[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum SenderKind { + #[cfg(feature = "tcp")] Tcp(TcpSender), + #[cfg(feature = "tcp")] TcpTls(TcpTlsSender), + #[cfg(feature = "quic")] Quic(QuicSender), + #[cfg(feature = "websocket")] WebSocket(WebSocketSender), + #[cfg(feature = "websocket")] WebSocketTls(WebSocketTlsSender), } impl SenderKind { + #[cfg(feature = "tcp")] pub fn get_tcp_sender(stream: TcpStream) -> Self { Self::Tcp(TcpSender { stream: Some(stream), }) } + #[cfg(feature = "tcp")] pub fn get_tcp_tls_sender(stream: TlsStream) -> Self { Self::TcpTls(TcpTlsSender { stream }) } + #[cfg(feature = "quic")] pub fn get_quic_sender(send_stream: SendStream, recv_stream: RecvStream) -> Self { Self::Quic(QuicSender { send: send_stream, @@ -113,15 +144,17 @@ impl SenderKind { }) } + #[cfg(feature = "websocket")] pub fn get_websocket_sender(stream: WebSocketSender) -> Self { Self::WebSocket(stream) } + #[cfg(feature = "websocket")] pub fn get_websocket_tls_sender(stream: WebSocketTlsSender) -> Self { Self::WebSocketTls(stream) } - #[cfg(unix)] + #[cfg(all(unix, feature = "tcp"))] pub fn take_and_migrate_tcp(&mut self) -> Option { match self { SenderKind::Tcp(tcp_sender) => { diff --git a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs index d158fc52e3..59aaf4a0cb 100644 --- a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs +++ b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs @@ -21,6 +21,7 @@ use crate::types::configuration::websocket_config::websocket_connection_string_o use crate::{AutoLogin, IggyDuration, WebSocketClientReconnectionConfig}; use std::fmt::{Display, Formatter}; use std::str::FromStr; +#[cfg(feature = "websocket")] use tungstenite::protocol::WebSocketConfig as TungsteniteConfig; /// Configuration for the WebSocket client. @@ -82,20 +83,34 @@ impl Default for WebSocketClientConfig { impl Default for WebSocketConfig { fn default() -> Self { - // Use tungstenite defaults - let tungstenite_config = TungsteniteConfig::default(); - WebSocketConfig { - read_buffer_size: Some(tungstenite_config.read_buffer_size), - write_buffer_size: Some(tungstenite_config.write_buffer_size), - max_write_buffer_size: Some(tungstenite_config.max_write_buffer_size), - max_message_size: tungstenite_config.max_message_size, - max_frame_size: tungstenite_config.max_frame_size, - accept_unmasked_frames: false, + #[cfg(feature = "websocket")] + { + let tungstenite_config = TungsteniteConfig::default(); + WebSocketConfig { + read_buffer_size: Some(tungstenite_config.read_buffer_size), + write_buffer_size: Some(tungstenite_config.write_buffer_size), + max_write_buffer_size: Some(tungstenite_config.max_write_buffer_size), + max_message_size: tungstenite_config.max_message_size, + max_frame_size: tungstenite_config.max_frame_size, + accept_unmasked_frames: false, + } + } + #[cfg(not(feature = "websocket"))] + { + WebSocketConfig { + read_buffer_size: Some(128 * 1024), + write_buffer_size: Some(128 * 1024), + max_write_buffer_size: Some(usize::MAX), + max_message_size: Some(64 << 20), + max_frame_size: Some(16 << 20), + accept_unmasked_frames: false, + } } } } impl WebSocketConfig { + #[cfg(feature = "websocket")] /// Convert to tungstenite WebSocketConfig so we can use tungstenite defaults pub fn to_tungstenite_config(&self) -> TungsteniteConfig { let mut config = TungsteniteConfig::default(); From 3e7d3f3115af9ceebeb8cf0f2a199c0c992ac7e4 Mon Sep 17 00:00:00 2001 From: shin Date: Tue, 3 Mar 2026 02:42:47 +0900 Subject: [PATCH 2/5] feat(configs): add websocket feature flag to configs crate Signed-off-by: shin --- core/configs/Cargo.toml | 6 +++++- core/configs/src/server_config/websocket.rs | 2 ++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/configs/Cargo.toml b/core/configs/Cargo.toml index ed7d6eba3e..bcd8a5e90e 100644 --- a/core/configs/Cargo.toml +++ b/core/configs/Cargo.toml @@ -22,6 +22,10 @@ edition = "2024" license = "Apache-2.0" publish = false +[features] +default = ["websocket"] +websocket = ["dep:tungstenite"] + [dependencies] configs_derive = { workspace = true } derive_more = { workspace = true } @@ -35,4 +39,4 @@ serde_with = { workspace = true } static-toml = { workspace = true } strum = { workspace = true } tracing = { workspace = true } -tungstenite = { workspace = true } +tungstenite = { workspace = true, optional = true } diff --git a/core/configs/src/server_config/websocket.rs b/core/configs/src/server_config/websocket.rs index b5a3446fc2..3072584b98 100644 --- a/core/configs/src/server_config/websocket.rs +++ b/core/configs/src/server_config/websocket.rs @@ -20,6 +20,7 @@ use configs::ConfigEnv; use iggy_common::IggyByteSize; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; +#[cfg(feature = "websocket")] use tungstenite::protocol::WebSocketConfig as TungsteniteConfig; #[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)] @@ -51,6 +52,7 @@ pub struct WebSocketTlsConfig { } impl WebSocketConfig { + #[cfg(feature = "websocket")] pub fn to_tungstenite_config(&self) -> TungsteniteConfig { let mut config = TungsteniteConfig::default(); From 67fe5fcf4e354ac0060f76b0879966473143fda4 Mon Sep 17 00:00:00 2001 From: shin Date: Tue, 3 Mar 2026 02:43:02 +0900 Subject: [PATCH 3/5] feat(server): add protocol feature flags to server crate Add tcp, quic, http, websocket feature flags to server crate. Guard protocol modules, shard fields, task spawning, and handler imports with #[cfg(feature)] attributes. Make axum, tower-http, jsonwebtoken optional behind http feature. Signed-off-by: shin --- core/server/Cargo.toml | 20 +++-- .../messages/send_messages_handler.rs | 84 ++++++++++--------- core/server/src/lib.rs | 12 +++ core/server/src/main.rs | 1 + core/server/src/shard/builder.rs | 4 + core/server/src/shard/handlers.rs | 37 +++++--- core/server/src/shard/mod.rs | 36 ++++++-- core/server/src/shard/system/cluster.rs | 12 +++ core/server/src/shard/tasks/continuous/mod.rs | 8 ++ .../src/shard/tasks/oneshot/config_writer.rs | 28 +++++++ core/server/src/shard/tasks/periodic/mod.rs | 2 + core/server/src/shard/transmission/frame.rs | 1 + core/server/src/shard/transmission/message.rs | 6 +- 13 files changed, 185 insertions(+), 66 deletions(-) diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 39c3c5de1b..3da89b560d 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -32,10 +32,14 @@ name = "iggy-server" path = "src/main.rs" [features] -default = ["mimalloc", "iggy-web"] +default = ["mimalloc", "iggy-web", "tcp", "quic", "http", "websocket"] disable-mimalloc = [] mimalloc = ["dep:mimalloc"] -iggy-web = ["dep:rust-embed", "dep:mime_guess"] +iggy-web = ["http", "dep:rust-embed", "dep:mime_guess"] +tcp = ["iggy_common/tcp"] +quic = ["iggy_common/quic"] +http = ["dep:axum", "dep:axum-server", "dep:cyper-axum", "dep:tower-http", "dep:jsonwebtoken"] +websocket = ["iggy_common/websocket", "configs/websocket", "dep:tungstenite"] [dependencies] ahash = { workspace = true } @@ -43,8 +47,8 @@ anyhow = { workspace = true } argon2 = { workspace = true } async-channel = { workspace = true } async_zip = { workspace = true } -axum = { workspace = true } -axum-server = { workspace = true } +axum = { workspace = true, optional = true } +axum-server = { workspace = true, optional = true } bytes = { workspace = true } chrono = { workspace = true } clap = { workspace = true } @@ -52,7 +56,7 @@ compio = { workspace = true } configs = { workspace = true } ctrlc = { workspace = true } cyper = { workspace = true } -cyper-axum = { workspace = true } +cyper-axum = { workspace = true, optional = true } dashmap = { workspace = true } dotenvy = { workspace = true } enum_dispatch = { workspace = true } @@ -65,7 +69,7 @@ futures = { workspace = true } hash32 = { workspace = true } human-repr = { workspace = true } iggy_common = { workspace = true } -jsonwebtoken = { workspace = true } +jsonwebtoken = { workspace = true, optional = true } left-right = { workspace = true } metadata = { workspace = true } mimalloc = { workspace = true, optional = true } @@ -95,12 +99,12 @@ sysinfo = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } toml = { workspace = true } -tower-http = { workspace = true } +tower-http = { workspace = true, optional = true } tracing = { workspace = true } tracing-appender = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true } -tungstenite = { workspace = true } +tungstenite = { workspace = true, optional = true } ulid = { workspace = true } uuid = { workspace = true } diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index 652b99a7c6..e6e515ed34 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -18,7 +18,9 @@ use crate::binary::command::{BinaryServerCommand, HandlerResult, ServerCommandHandler}; use crate::shard::IggyShard; -use crate::shard::transmission::message::{ResolvedPartition, ShardRequest, ShardRequestPayload}; +use crate::shard::transmission::message::ResolvedPartition; +#[cfg(feature = "tcp")] +use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload}; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; use crate::streaming::session::Session; use crate::streaming::topics; @@ -27,11 +29,14 @@ use iggy_common::Identifier; use iggy_common::PooledBuffer; use iggy_common::SenderKind; use iggy_common::Sizeable; +#[cfg(feature = "tcp")] use iggy_common::sharding::IggyNamespace; use iggy_common::{INDEX_SIZE, PartitioningKind}; use iggy_common::{IggyError, Partitioning, SendMessages, Validatable}; use std::rc::Rc; -use tracing::{debug, error, info, instrument}; +use tracing::instrument; +#[cfg(feature = "tcp")] +use tracing::{debug, error, info}; impl ServerCommandHandler for SendMessages { fn code(&self) -> u32 { @@ -164,45 +169,48 @@ impl ServerCommandHandler for SendMessages { } }; - let namespace = IggyNamespace::new(topic.stream_id, topic.topic_id, partition_id); - let user_id = session.get_user_id(); - let unsupported_socket_transfer = matches!( - self.partitioning.kind, - PartitioningKind::Balanced | PartitioningKind::MessagesKey - ); - let enabled_socket_migration = shard.config.tcp.socket_migration; - - if enabled_socket_migration - && !(session.is_migrated() || unsupported_socket_transfer) - && let Some(target_shard) = shard.find_shard(&namespace) - && target_shard.id != shard.id + #[cfg(feature = "tcp")] { - debug!( - "TCP wrong shared detected: migrating from_shard {}, to_shard {}", - shard.id, target_shard.id + let namespace = IggyNamespace::new(topic.stream_id, topic.topic_id, partition_id); + let user_id = session.get_user_id(); + let unsupported_socket_transfer = matches!( + self.partitioning.kind, + PartitioningKind::Balanced | PartitioningKind::MessagesKey ); - - if let Some(fd) = sender.take_and_migrate_tcp() { - let payload = ShardRequestPayload::SocketTransfer { - fd, - from_shard: shard.id, - client_id: session.client_id, - user_id, - address: session.ip_address, - initial_data: batch, - }; - - let request = ShardRequest::data_plane(namespace, payload); - - if let Err(e) = shard.send_to_data_plane(request).await { - error!("transfer socket to another shard failed, drop connection. {e:?}"); - return Ok(HandlerResult::Finished); + let enabled_socket_migration = shard.config.tcp.socket_migration; + + if enabled_socket_migration + && !(session.is_migrated() || unsupported_socket_transfer) + && let Some(target_shard) = shard.find_shard(&namespace) + && target_shard.id != shard.id + { + debug!( + "TCP wrong shared detected: migrating from_shard {}, to_shard {}", + shard.id, target_shard.id + ); + + if let Some(fd) = sender.take_and_migrate_tcp() { + let payload = ShardRequestPayload::SocketTransfer { + fd, + from_shard: shard.id, + client_id: session.client_id, + user_id, + address: session.ip_address, + initial_data: batch, + }; + + let request = ShardRequest::data_plane(namespace, payload); + + if let Err(e) = shard.send_to_data_plane(request).await { + error!("transfer socket to another shard failed, drop connection. {e:?}"); + return Ok(HandlerResult::Finished); + } + + info!("Sending socket transfer to shard {}", target_shard.id); + return Ok(HandlerResult::Migrated { + to_shard: target_shard.id, + }); } - - info!("Sending socket transfer to shard {}", target_shard.id); - return Ok(HandlerResult::Migrated { - to_shard: target_shard.id, - }); } } diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs index b4bac970d9..c8ccbe8fa4 100644 --- a/core/server/src/lib.rs +++ b/core/server/src/lib.rs @@ -29,23 +29,35 @@ static GLOBAL: MiMalloc = MiMalloc; #[cfg(windows)] compile_error!("iggy-server doesn't support windows."); +#[cfg(not(any( + feature = "tcp", + feature = "quic", + feature = "http", + feature = "websocket" +)))] +compile_error!("At least one transport feature must be enabled (tcp, quic, http, websocket)."); + pub mod args; pub mod binary; pub mod bootstrap; pub(crate) mod compat; pub mod configs; pub mod diagnostics; +#[cfg(feature = "http")] pub mod http; pub mod io; pub mod log; pub mod metadata; +#[cfg(feature = "quic")] pub mod quic; pub mod server_error; pub mod shard; pub mod shard_allocator; pub mod state; pub mod streaming; +#[cfg(feature = "tcp")] pub mod tcp; +#[cfg(feature = "websocket")] pub mod websocket; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 6a7fc1e066..b0113a114c 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -304,6 +304,7 @@ fn main() -> Result<(), ServerError> { let metrics = Metrics::init(); // TWELFTH DISCRETE LOADING STEP. + #[cfg(feature = "tcp")] info!( "Enable TCP socket migration across shards: {}.", config.tcp.socket_migration diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 1454c9f93e..d251f63058 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -176,9 +176,13 @@ impl IggyShardBuilder { metrics, is_follower: self.is_follower, is_shutting_down: AtomicBool::new(false), + #[cfg(feature = "tcp")] tcp_bound_address: Cell::new(None), + #[cfg(feature = "quic")] quic_bound_address: Cell::new(None), + #[cfg(feature = "websocket")] websocket_bound_address: Cell::new(None), + #[cfg(feature = "http")] http_bound_address: Cell::new(None), config_writer_notify, config_writer_receiver, diff --git a/core/server/src/shard/handlers.rs b/core/server/src/shard/handlers.rs index 505eb50f14..2c682a34ec 100644 --- a/core/server/src/shard/handlers.rs +++ b/core/server/src/shard/handlers.rs @@ -16,23 +16,27 @@ // under the License. use super::*; -use crate::{ - shard::{ - IggyShard, execution, - transmission::{ - event::ShardEvent, - frame::ShardResponse, - message::{ShardMessage, ShardRequest, ShardRequestPayload}, - }, - }, - tcp::{ - connection_handler::{ConnectionAction, handle_connection, handle_error}, - tcp_listener::cleanup_connection, +use crate::shard::{ + IggyShard, execution, + transmission::{ + event::ShardEvent, + frame::ShardResponse, + message::{ShardMessage, ShardRequest, ShardRequestPayload}, }, }; +#[cfg(feature = "tcp")] +use crate::tcp::{ + connection_handler::{ConnectionAction, handle_connection, handle_error}, + tcp_listener::cleanup_connection, +}; +#[cfg(feature = "tcp")] use compio::net::TcpStream; -use iggy_common::{IggyError, SenderKind, TransportProtocol, sharding::IggyNamespace}; +#[cfg(feature = "tcp")] +use iggy_common::SenderKind; +use iggy_common::{IggyError, TransportProtocol, sharding::IggyNamespace}; +#[cfg(feature = "tcp")] use nix::sys::stat::SFlag; +#[cfg(feature = "tcp")] use std::os::fd::{FromRawFd, IntoRawFd}; use tracing::info; @@ -378,6 +382,7 @@ async fn handle_request( Ok(ShardResponse::CompletePartitionRevocationResponse) } + #[cfg(feature = "tcp")] ShardRequestPayload::SocketTransfer { fd, from_shard, @@ -532,22 +537,28 @@ pub async fn handle_event(shard: &Rc, event: ShardEvent) -> Result<() protocol, address ); match protocol { + #[cfg(feature = "tcp")] TransportProtocol::Tcp => { shard.tcp_bound_address.set(Some(address)); let _ = shard.config_writer_notify.try_send(()); } + #[cfg(feature = "quic")] TransportProtocol::Quic => { shard.quic_bound_address.set(Some(address)); let _ = shard.config_writer_notify.try_send(()); } + #[cfg(feature = "http")] TransportProtocol::Http => { shard.http_bound_address.set(Some(address)); let _ = shard.config_writer_notify.try_send(()); } + #[cfg(feature = "websocket")] TransportProtocol::WebSocket => { shard.websocket_bound_address.set(Some(address)); let _ = shard.config_writer_notify.try_send(()); } + #[allow(unreachable_patterns)] + _ => {} } Ok(()) } diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index e76a8cb5c6..d7110d3302 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -88,9 +88,13 @@ pub struct IggyShard { pub messages_receiver: Cell>>, pub(crate) stop_receiver: StopReceiver, pub(crate) is_shutting_down: AtomicBool, + #[cfg(feature = "tcp")] pub(crate) tcp_bound_address: Cell>, + #[cfg(feature = "quic")] pub(crate) quic_bound_address: Cell>, + #[cfg(feature = "websocket")] pub(crate) websocket_bound_address: Cell>, + #[cfg(feature = "http")] pub(crate) http_bound_address: Cell>, pub(crate) config_writer_notify: async_channel::Sender<()>, config_writer_receiver: async_channel::Receiver<()>, @@ -119,19 +123,16 @@ impl IggyShard { continuous::spawn_message_pump(self.clone()); // Spawn config writer task on shard 0 if we need to wait for bound addresses - if self.id == 0 - && (self.config.tcp.enabled - || self.config.quic.enabled - || self.config.http.enabled - || self.config.websocket.enabled) - { + if self.id == 0 && self.any_transport_enabled() { tasks::oneshot::spawn_config_writer_task(self); } + #[cfg(feature = "tcp")] if self.config.tcp.enabled { continuous::spawn_tcp_server(self.clone()); } + #[cfg(feature = "http")] if self.config.http.enabled && self.id == 0 { continuous::spawn_http_server(self.clone()); } @@ -141,9 +142,11 @@ impl IggyShard { // TODO(hubcio): QUIC doesn't properly work on all shards, especially tests `concurrent` and `system_scenario`. // it's probably related to Endpoint not Cloned between shards, but all shards are creating its own instance. // This way packet CID is invalid. (crypto-related stuff) + #[cfg(feature = "quic")] if self.config.quic.enabled && self.id == 0 { continuous::spawn_quic_server(self.clone()); } + #[cfg(feature = "websocket")] if self.config.websocket.enabled { continuous::spawn_websocket_server(self.clone()); } @@ -373,6 +376,27 @@ impl IggyShard { self.task_registry.graceful_shutdown(SHUTDOWN_TIMEOUT).await } + fn any_transport_enabled(&self) -> bool { + let mut enabled = false; + #[cfg(feature = "tcp")] + { + enabled |= self.config.tcp.enabled; + } + #[cfg(feature = "quic")] + { + enabled |= self.config.quic.enabled; + } + #[cfg(feature = "http")] + { + enabled |= self.config.http.enabled; + } + #[cfg(feature = "websocket")] + { + enabled |= self.config.websocket.enabled; + } + enabled + } + pub fn get_available_shards_count(&self) -> u32 { self.shards.len() as u32 } diff --git a/core/server/src/shard/system/cluster.rs b/core/server/src/shard/system/cluster.rs index b9e086569b..7a568d5f45 100644 --- a/core/server/src/shard/system/cluster.rs +++ b/core/server/src/shard/system/cluster.rs @@ -111,29 +111,41 @@ impl IggyShard { /// Get actual bound ports from the shard's bound addresses /// This is needed when server binds to port 0 (OS-assigned port) fn get_actual_bound_ports(&self) -> Option { + #[cfg(feature = "tcp")] let tcp_port = self .tcp_bound_address .get() .map(|addr| addr.port()) .unwrap_or_else(|| extract_port(&self.config.tcp.address)); + #[cfg(not(feature = "tcp"))] + let tcp_port = extract_port(&self.config.tcp.address); + #[cfg(feature = "quic")] let quic_port = self .quic_bound_address .get() .map(|addr| addr.port()) .unwrap_or_else(|| extract_port(&self.config.quic.address)); + #[cfg(not(feature = "quic"))] + let quic_port = extract_port(&self.config.quic.address); + #[cfg(feature = "http")] let http_port = self .http_bound_address .get() .map(|addr| addr.port()) .unwrap_or_else(|| extract_port(&self.config.http.address)); + #[cfg(not(feature = "http"))] + let http_port = extract_port(&self.config.http.address); + #[cfg(feature = "websocket")] let websocket_port = self .websocket_bound_address .get() .map(|addr| addr.port()) .unwrap_or_else(|| extract_port(&self.config.websocket.address)); + #[cfg(not(feature = "websocket"))] + let websocket_port = extract_port(&self.config.websocket.address); trace!( "Using actual bound ports - TCP: {}, QUIC: {}, HTTP: {}, WebSocket: {}", diff --git a/core/server/src/shard/tasks/continuous/mod.rs b/core/server/src/shard/tasks/continuous/mod.rs index aa59a8aa25..43f7b0f5e5 100644 --- a/core/server/src/shard/tasks/continuous/mod.rs +++ b/core/server/src/shard/tasks/continuous/mod.rs @@ -16,14 +16,22 @@ * under the License. */ +#[cfg(feature = "http")] mod http_server; mod message_pump; +#[cfg(feature = "quic")] mod quic_server; +#[cfg(feature = "tcp")] mod tcp_server; +#[cfg(feature = "websocket")] mod websocket_server; +#[cfg(feature = "http")] pub use http_server::spawn_http_server; pub use message_pump::spawn_message_pump; +#[cfg(feature = "quic")] pub use quic_server::spawn_quic_server; +#[cfg(feature = "tcp")] pub use tcp_server::spawn_tcp_server; +#[cfg(feature = "websocket")] pub use websocket_server::spawn_websocket_server; diff --git a/core/server/src/shard/tasks/oneshot/config_writer.rs b/core/server/src/shard/tasks/oneshot/config_writer.rs index 3bee6cc6de..e661ee7943 100644 --- a/core/server/src/shard/tasks/oneshot/config_writer.rs +++ b/core/server/src/shard/tasks/oneshot/config_writer.rs @@ -39,9 +39,13 @@ async fn write_config( shutdown_token: ShutdownToken, ) -> Result<(), IggyError> { let shard_clone = shard.clone(); + #[cfg(feature = "tcp")] let tcp_enabled = shard.config.tcp.enabled; + #[cfg(feature = "quic")] let quic_enabled = shard.config.quic.enabled; + #[cfg(feature = "http")] let http_enabled = shard.config.http.enabled; + #[cfg(feature = "websocket")] let websocket_enabled = shard.config.websocket.enabled; let notify_receiver = shard_clone.config_writer_receiver.clone(); @@ -62,11 +66,23 @@ async fn write_config( } } + #[cfg(feature = "tcp")] let tcp_ready = !tcp_enabled || shard_clone.tcp_bound_address.get().is_some(); + #[cfg(not(feature = "tcp"))] + let tcp_ready = true; + #[cfg(feature = "quic")] let quic_ready = !quic_enabled || shard_clone.quic_bound_address.get().is_some(); + #[cfg(not(feature = "quic"))] + let quic_ready = true; + #[cfg(feature = "http")] let http_ready = !http_enabled || shard_clone.http_bound_address.get().is_some(); + #[cfg(not(feature = "http"))] + let http_ready = true; + #[cfg(feature = "websocket")] let websocket_ready = !websocket_enabled || shard_clone.websocket_bound_address.get().is_some(); + #[cfg(not(feature = "websocket"))] + let websocket_ready = true; if tcp_ready && quic_ready && http_ready && websocket_ready { break; @@ -75,10 +91,22 @@ async fn write_config( let mut current_config = shard_clone.config.clone(); + #[cfg(feature = "tcp")] let tcp_addr = shard_clone.tcp_bound_address.get(); + #[cfg(not(feature = "tcp"))] + let tcp_addr: Option = None; + #[cfg(feature = "quic")] let quic_addr = shard_clone.quic_bound_address.get(); + #[cfg(not(feature = "quic"))] + let quic_addr: Option = None; + #[cfg(feature = "http")] let http_addr = shard_clone.http_bound_address.get(); + #[cfg(not(feature = "http"))] + let http_addr: Option = None; + #[cfg(feature = "websocket")] let websocket_addr = shard_clone.websocket_bound_address.get(); + #[cfg(not(feature = "websocket"))] + let websocket_addr: Option = None; info!( "Config writer: TCP addr = {:?}, QUIC addr = {:?}, HTTP addr = {:?}, WebSocket addr = {:?}", diff --git a/core/server/src/shard/tasks/periodic/mod.rs b/core/server/src/shard/tasks/periodic/mod.rs index 92e2f14a70..dcbe172120 100644 --- a/core/server/src/shard/tasks/periodic/mod.rs +++ b/core/server/src/shard/tasks/periodic/mod.rs @@ -17,6 +17,7 @@ */ mod heartbeat_verifier; +#[cfg(feature = "http")] mod jwt_token_cleaner; mod message_cleaner; mod message_saver; @@ -25,6 +26,7 @@ mod revocation_timeout; mod sysinfo_printer; pub use heartbeat_verifier::spawn_heartbeat_verifier; +#[cfg(feature = "http")] pub use jwt_token_cleaner::spawn_jwt_token_cleaner; pub use message_cleaner::spawn_message_cleaner; pub use message_saver::spawn_message_saver; diff --git a/core/server/src/shard/transmission/frame.rs b/core/server/src/shard/transmission/frame.rs index ab4351011e..73ceabb66b 100644 --- a/core/server/src/shard/transmission/frame.rs +++ b/core/server/src/shard/transmission/frame.rs @@ -83,6 +83,7 @@ pub enum ShardResponse { CreatePartitionsResponse(Vec), DeletePartitionsResponse(Vec), UpdateStreamResponse, + #[cfg(feature = "tcp")] SocketTransferResponse, UpdatePermissionsResponse, ChangePasswordResponse, diff --git a/core/server/src/shard/transmission/message.rs b/core/server/src/shard/transmission/message.rs index b533eddf1e..f9396d50d1 100644 --- a/core/server/src/shard/transmission/message.rs +++ b/core/server/src/shard/transmission/message.rs @@ -31,7 +31,10 @@ use iggy_common::{ update_topic::UpdateTopic, update_user::UpdateUser, }; -use std::{net::SocketAddr, os::fd::OwnedFd}; +#[cfg(feature = "tcp")] +use std::net::SocketAddr; +#[cfg(feature = "tcp")] +use std::os::fd::OwnedFd; /// Resolved stream ID. Contains only the numeric ID - `Identifier` stays at handler boundary. #[derive(Debug, Clone, Copy)] @@ -120,6 +123,7 @@ pub enum ShardRequestPayload { topic_id: usize, partition_ids: Vec, }, + #[cfg(feature = "tcp")] SocketTransfer { fd: OwnedFd, from_shard: u16, From 940a0894088b54f21d4a5982ec963629143d048f Mon Sep 17 00:00:00 2001 From: shin Date: Tue, 3 Mar 2026 02:43:19 +0900 Subject: [PATCH 4/5] feat(sdk): add protocol feature flags for conditional compilation Add tcp, quic, http, websocket feature flags to SDK crate. Guard client modules, ClientWrapper variants, builder structs, and match arms with #[cfg(feature)] attributes. Extract shared TLS verifier to new tls module for TCP/WebSocket independence. Make transport-specific dependencies optional. Signed-off-by: shin --- core/sdk/Cargo.toml | 28 +++++--- core/sdk/src/client_provider.rs | 68 +++++++++++++++--- core/sdk/src/client_wrappers/binary_client.rs | 16 +++++ .../client_wrappers/binary_cluster_client.rs | 4 ++ .../binary_consumer_group_client.rs | 28 ++++++++ .../binary_consumer_offset_client.rs | 12 ++++ .../client_wrappers/binary_message_client.rs | 12 ++++ .../binary_partition_client.rs | 8 +++ .../binary_personal_access_token_client.rs | 16 +++++ .../client_wrappers/binary_segment_client.rs | 4 ++ .../client_wrappers/binary_stream_client.rs | 24 +++++++ .../client_wrappers/binary_system_client.rs | 28 ++++++++ .../client_wrappers/binary_topic_client.rs | 24 +++++++ .../src/client_wrappers/binary_user_client.rs | 36 ++++++++++ .../sdk/src/client_wrappers/client_wrapper.rs | 8 +++ .../src/client_wrappers/connection_info.rs | 4 ++ .../clients/binary_personal_access_tokens.rs | 7 +- core/sdk/src/clients/binary_users.rs | 4 ++ core/sdk/src/clients/client.rs | 20 +++++- core/sdk/src/clients/client_builder.rs | 44 ++++++++++-- core/sdk/src/lib.rs | 7 ++ core/sdk/src/prelude.rs | 2 + core/sdk/src/tcp/tcp_tls_verifier.rs | 53 +------------- core/sdk/src/tls/mod.rs | 19 +++++ core/sdk/src/tls/no_server_verification.rs | 70 +++++++++++++++++++ core/sdk/src/websocket/websocket_client.rs | 2 +- 26 files changed, 471 insertions(+), 77 deletions(-) create mode 100644 core/sdk/src/tls/mod.rs create mode 100644 core/sdk/src/tls/no_server_verification.rs diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index 656288a824..08be860126 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -28,6 +28,14 @@ documentation = "https://iggy.apache.org/docs" repository = "https://github.com/apache/iggy" readme = "../../README.md" +[features] +default = ["tokio_lock", "tcp", "quic", "http", "websocket"] +tokio_lock = [] +tcp = ["dep:tokio-rustls", "dep:rustls", "dep:webpki-roots"] +quic = ["dep:quinn", "dep:rustls"] +http = ["dep:reqwest", "dep:reqwest-middleware", "dep:reqwest-retry", "dep:reqwest-tracing"] +websocket = ["dep:tokio-tungstenite", "dep:tungstenite", "dep:rustls", "dep:webpki-roots"] + [dependencies] async-broadcast = { workspace = true } async-dropper = { workspace = true } @@ -40,20 +48,20 @@ futures = { workspace = true } futures-util = { workspace = true } iggy_binary_protocol = { workspace = true } iggy_common = { workspace = true } -quinn = { workspace = true } -reqwest = { workspace = true } -reqwest-middleware = { workspace = true } -reqwest-retry = { workspace = true } -reqwest-tracing = { workspace = true } -rustls = { workspace = true } +quinn = { workspace = true, optional = true } +reqwest = { workspace = true, optional = true } +reqwest-middleware = { workspace = true, optional = true } +reqwest-retry = { workspace = true, optional = true } +reqwest-tracing = { workspace = true, optional = true } +rustls = { workspace = true, optional = true } serde = { workspace = true } tokio = { workspace = true } -tokio-rustls = { workspace = true } -tokio-tungstenite = { workspace = true } +tokio-rustls = { workspace = true, optional = true } +tokio-tungstenite = { workspace = true, optional = true } tracing = { workspace = true } trait-variant = { workspace = true } -tungstenite = { workspace = true } -webpki-roots = { workspace = true } +tungstenite = { workspace = true, optional = true } +webpki-roots = { workspace = true, optional = true } [dev-dependencies] mockall = { workspace = true } diff --git a/core/sdk/src/client_provider.rs b/core/sdk/src/client_provider.rs index 1b9eba7364..dfb43850b3 100644 --- a/core/sdk/src/client_provider.rs +++ b/core/sdk/src/client_provider.rs @@ -18,18 +18,32 @@ use crate::client_wrappers::client_wrapper::ClientWrapper; use crate::clients::client::IggyClient; +#[cfg(feature = "http")] use crate::http::http_client::HttpClient; -use crate::prelude::{ - ClientError, HttpClientConfig, IggyDuration, QuicClientConfig, QuicClientReconnectionConfig, - TcpClientConfig, TcpClientReconnectionConfig, WebSocketClient, -}; +use crate::prelude::ClientError; +#[cfg(feature = "http")] +use crate::prelude::HttpClientConfig; +#[cfg(any(feature = "quic", feature = "tcp", feature = "websocket"))] +use crate::prelude::IggyDuration; +#[cfg(feature = "websocket")] +use crate::prelude::WebSocketClient; +#[cfg(feature = "quic")] +use crate::prelude::{QuicClientConfig, QuicClientReconnectionConfig}; +#[cfg(feature = "tcp")] +use crate::prelude::{TcpClientConfig, TcpClientReconnectionConfig}; +#[cfg(feature = "quic")] use crate::quic::quic_client::QuicClient; +#[cfg(feature = "tcp")] use crate::tcp::tcp_client::TcpClient; +#[cfg(any(feature = "tcp", feature = "quic", feature = "websocket"))] use iggy_binary_protocol::Client; -use iggy_common::{ - AutoLogin, Credentials, TransportProtocol, WebSocketClientConfig, - WebSocketClientReconnectionConfig, WebSocketConfig, -}; +#[cfg(any(feature = "quic", feature = "tcp", feature = "websocket"))] +use iggy_common::AutoLogin; +#[cfg(any(feature = "quic", feature = "tcp", feature = "websocket"))] +use iggy_common::Credentials; +use iggy_common::TransportProtocol; +#[cfg(feature = "websocket")] +use iggy_common::{WebSocketClientConfig, WebSocketClientReconnectionConfig, WebSocketConfig}; use std::str::FromStr; use std::sync::Arc; @@ -44,22 +58,42 @@ pub struct ClientProviderConfig { /// The transport protocol to use. pub transport: TransportProtocol, /// The optional configuration for the HTTP transport. + #[cfg(feature = "http")] pub http: Option>, /// The optional configuration for the QUIC transport. + #[cfg(feature = "quic")] pub quic: Option>, /// The optional configuration for the TCP transport. + #[cfg(feature = "tcp")] pub tcp: Option>, /// The optional configuration for the WebSocket transport. + #[cfg(feature = "websocket")] pub websocket: Option>, } impl Default for ClientProviderConfig { fn default() -> ClientProviderConfig { ClientProviderConfig { + #[cfg(feature = "tcp")] transport: TransportProtocol::Tcp, + #[cfg(all(not(feature = "tcp"), feature = "quic"))] + transport: TransportProtocol::Quic, + #[cfg(all(not(feature = "tcp"), not(feature = "quic"), feature = "http"))] + transport: TransportProtocol::Http, + #[cfg(all( + not(feature = "tcp"), + not(feature = "quic"), + not(feature = "http"), + feature = "websocket" + ))] + transport: TransportProtocol::WebSocket, + #[cfg(feature = "http")] http: Some(Arc::new(HttpClientConfig::default())), + #[cfg(feature = "quic")] quic: Some(Arc::new(QuicClientConfig::default())), + #[cfg(feature = "tcp")] tcp: Some(Arc::new(TcpClientConfig::default())), + #[cfg(feature = "websocket")] websocket: Some(Arc::new(WebSocketClientConfig::default())), } } @@ -77,16 +111,22 @@ impl ClientProviderConfig { args: crate::prelude::Args, auto_login: bool, ) -> Result { + let _ = &auto_login; let transport = TransportProtocol::from_str(&args.transport) .map_err(|_| ClientError::InvalidTransport(args.transport.clone()))?; let mut config = Self { transport, + #[cfg(feature = "http")] http: None, + #[cfg(feature = "quic")] quic: None, + #[cfg(feature = "tcp")] tcp: None, + #[cfg(feature = "websocket")] websocket: None, }; match config.transport { + #[cfg(feature = "quic")] TransportProtocol::Quic => { config.quic = Some(Arc::new(QuicClientConfig { client_address: args.quic_client_address, @@ -122,12 +162,14 @@ impl ClientProviderConfig { validate_certificate: args.quic_validate_certificate, })); } + #[cfg(feature = "http")] TransportProtocol::Http => { config.http = Some(Arc::new(HttpClientConfig { api_url: args.http_api_url, retries: args.http_retries, })); } + #[cfg(feature = "tcp")] TransportProtocol::Tcp => { config.tcp = Some(Arc::new(TcpClientConfig { server_address: args.tcp_server_address, @@ -157,6 +199,7 @@ impl ClientProviderConfig { }, })); } + #[cfg(feature = "websocket")] TransportProtocol::WebSocket => { config.websocket = Some(Arc::new(WebSocketClientConfig { server_address: args.websocket_server_address, @@ -187,6 +230,8 @@ impl ClientProviderConfig { tls_validate_certificate: args.websocket_tls_validate_certificate, })); } + #[allow(unreachable_patterns)] + _ => return Err(ClientError::InvalidTransport(args.transport)), } Ok(config) @@ -216,7 +261,9 @@ pub async fn get_raw_client( config: Arc, establish_connection: bool, ) -> Result { + let _ = &establish_connection; match config.transport { + #[cfg(feature = "quic")] TransportProtocol::Quic => { let quic_config = config.quic.as_ref().unwrap(); let client = QuicClient::create(quic_config.clone())?; @@ -225,11 +272,13 @@ pub async fn get_raw_client( }; Ok(ClientWrapper::Quic(client)) } + #[cfg(feature = "http")] TransportProtocol::Http => { let http_config = config.http.as_ref().unwrap(); let client = HttpClient::create(http_config.clone())?; Ok(ClientWrapper::Http(client)) } + #[cfg(feature = "tcp")] TransportProtocol::Tcp => { let tcp_config = config.tcp.as_ref().unwrap(); let client = TcpClient::create(tcp_config.clone())?; @@ -238,6 +287,7 @@ pub async fn get_raw_client( }; Ok(ClientWrapper::Tcp(client)) } + #[cfg(feature = "websocket")] TransportProtocol::WebSocket => { let websocket_config = config.websocket.as_ref().unwrap(); let client = WebSocketClient::create(websocket_config.clone())?; @@ -246,5 +296,7 @@ pub async fn get_raw_client( }; Ok(ClientWrapper::WebSocket(client)) } + #[allow(unreachable_patterns)] + _ => Err(ClientError::InvalidTransport(config.transport.to_string())), } } diff --git a/core/sdk/src/client_wrappers/binary_client.rs b/core/sdk/src/client_wrappers/binary_client.rs index ee650c7777..c91c0a2eed 100644 --- a/core/sdk/src/client_wrappers/binary_client.rs +++ b/core/sdk/src/client_wrappers/binary_client.rs @@ -27,9 +27,13 @@ impl Client for ClientWrapper { async fn connect(&self) -> Result<(), IggyError> { match self { ClientWrapper::Iggy(client) => client.connect().await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.connect().await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.connect().await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.connect().await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.connect().await, } } @@ -37,9 +41,13 @@ impl Client for ClientWrapper { async fn disconnect(&self) -> Result<(), IggyError> { match self { ClientWrapper::Iggy(client) => client.disconnect().await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.disconnect().await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.disconnect().await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.disconnect().await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.disconnect().await, } } @@ -47,9 +55,13 @@ impl Client for ClientWrapper { async fn shutdown(&self) -> Result<(), IggyError> { match self { ClientWrapper::Iggy(client) => client.shutdown().await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.shutdown().await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.shutdown().await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.shutdown().await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.shutdown().await, } } @@ -57,9 +69,13 @@ impl Client for ClientWrapper { async fn subscribe_events(&self) -> Receiver { match self { ClientWrapper::Iggy(client) => client.subscribe_events().await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.subscribe_events().await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.subscribe_events().await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.subscribe_events().await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.subscribe_events().await, } } diff --git a/core/sdk/src/client_wrappers/binary_cluster_client.rs b/core/sdk/src/client_wrappers/binary_cluster_client.rs index ca3cd34787..46af89fdc5 100644 --- a/core/sdk/src/client_wrappers/binary_cluster_client.rs +++ b/core/sdk/src/client_wrappers/binary_cluster_client.rs @@ -26,9 +26,13 @@ impl ClusterClient for ClientWrapper { async fn get_cluster_metadata(&self) -> Result { match self { ClientWrapper::Iggy(client) => client.get_cluster_metadata().await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.get_cluster_metadata().await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.get_cluster_metadata().await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.get_cluster_metadata().await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.get_cluster_metadata().await, } } diff --git a/core/sdk/src/client_wrappers/binary_consumer_group_client.rs b/core/sdk/src/client_wrappers/binary_consumer_group_client.rs index d0f3d74b74..e5ea50ea36 100644 --- a/core/sdk/src/client_wrappers/binary_consumer_group_client.rs +++ b/core/sdk/src/client_wrappers/binary_consumer_group_client.rs @@ -36,21 +36,25 @@ impl ConsumerGroupClient for ClientWrapper { .get_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .get_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .get_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .get_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .get_consumer_group(stream_id, topic_id, group_id) @@ -66,9 +70,13 @@ impl ConsumerGroupClient for ClientWrapper { ) -> Result, IggyError> { match self { ClientWrapper::Iggy(client) => client.get_consumer_groups(stream_id, topic_id).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.get_consumer_groups(stream_id, topic_id).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.get_consumer_groups(stream_id, topic_id).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.get_consumer_groups(stream_id, topic_id).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client.get_consumer_groups(stream_id, topic_id).await } @@ -87,21 +95,25 @@ impl ConsumerGroupClient for ClientWrapper { .create_consumer_group(stream_id, topic_id, name) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .create_consumer_group(stream_id, topic_id, name) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .create_consumer_group(stream_id, topic_id, name) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .create_consumer_group(stream_id, topic_id, name) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .create_consumer_group(stream_id, topic_id, name) @@ -122,21 +134,25 @@ impl ConsumerGroupClient for ClientWrapper { .delete_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .delete_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .delete_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .delete_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .delete_consumer_group(stream_id, topic_id, group_id) @@ -157,21 +173,25 @@ impl ConsumerGroupClient for ClientWrapper { .join_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .join_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .join_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .join_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .join_consumer_group(stream_id, topic_id, group_id) @@ -192,21 +212,25 @@ impl ConsumerGroupClient for ClientWrapper { .leave_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .leave_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .leave_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .leave_consumer_group(stream_id, topic_id, group_id) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .leave_consumer_group(stream_id, topic_id, group_id) @@ -223,15 +247,19 @@ impl AsyncDrop for ClientWrapper { ClientWrapper::Iggy(client) => { let _ = client.logout_user().await; } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { let _ = client.logout_user().await; } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { let _ = client.logout_user().await; } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { let _ = client.logout_user().await; } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { let _ = client.logout_user().await; } diff --git a/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs b/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs index d934f7de9d..30610e9a0c 100644 --- a/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs +++ b/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs @@ -37,21 +37,25 @@ impl ConsumerOffsetClient for ClientWrapper { .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) @@ -73,21 +77,25 @@ impl ConsumerOffsetClient for ClientWrapper { .get_consumer_offset(consumer, stream_id, topic_id, partition_id) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .get_consumer_offset(consumer, stream_id, topic_id, partition_id) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .get_consumer_offset(consumer, stream_id, topic_id, partition_id) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .get_consumer_offset(consumer, stream_id, topic_id, partition_id) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .get_consumer_offset(consumer, stream_id, topic_id, partition_id) @@ -109,21 +117,25 @@ impl ConsumerOffsetClient for ClientWrapper { .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) diff --git a/core/sdk/src/client_wrappers/binary_message_client.rs b/core/sdk/src/client_wrappers/binary_message_client.rs index 48cffbb006..25cad33cad 100644 --- a/core/sdk/src/client_wrappers/binary_message_client.rs +++ b/core/sdk/src/client_wrappers/binary_message_client.rs @@ -49,6 +49,7 @@ impl MessageClient for ClientWrapper { ) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .poll_messages( @@ -62,6 +63,7 @@ impl MessageClient for ClientWrapper { ) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .poll_messages( @@ -75,6 +77,7 @@ impl MessageClient for ClientWrapper { ) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .poll_messages( @@ -88,6 +91,7 @@ impl MessageClient for ClientWrapper { ) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .poll_messages( @@ -117,21 +121,25 @@ impl MessageClient for ClientWrapper { .send_messages(stream_id, topic_id, partitioning, messages) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .send_messages(stream_id, topic_id, partitioning, messages) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .send_messages(stream_id, topic_id, partitioning, messages) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .send_messages(stream_id, topic_id, partitioning, messages) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .send_messages(stream_id, topic_id, partitioning, messages) @@ -153,21 +161,25 @@ impl MessageClient for ClientWrapper { .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync) diff --git a/core/sdk/src/client_wrappers/binary_partition_client.rs b/core/sdk/src/client_wrappers/binary_partition_client.rs index 83e60ed8e2..d3c5b40618 100644 --- a/core/sdk/src/client_wrappers/binary_partition_client.rs +++ b/core/sdk/src/client_wrappers/binary_partition_client.rs @@ -35,21 +35,25 @@ impl PartitionClient for ClientWrapper { .create_partitions(stream_id, topic_id, partitions_count) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .create_partitions(stream_id, topic_id, partitions_count) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .create_partitions(stream_id, topic_id, partitions_count) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .create_partitions(stream_id, topic_id, partitions_count) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .create_partitions(stream_id, topic_id, partitions_count) @@ -70,21 +74,25 @@ impl PartitionClient for ClientWrapper { .delete_partitions(stream_id, topic_id, partitions_count) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .delete_partitions(stream_id, topic_id, partitions_count) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .delete_partitions(stream_id, topic_id, partitions_count) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .delete_partitions(stream_id, topic_id, partitions_count) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .delete_partitions(stream_id, topic_id, partitions_count) diff --git a/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs b/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs index 388f524504..30a1b22bcf 100644 --- a/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs +++ b/core/sdk/src/client_wrappers/binary_personal_access_token_client.rs @@ -29,9 +29,13 @@ impl PersonalAccessTokenClient for ClientWrapper { async fn get_personal_access_tokens(&self) -> Result, IggyError> { match self { ClientWrapper::Iggy(client) => client.get_personal_access_tokens().await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.get_personal_access_tokens().await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.get_personal_access_tokens().await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.get_personal_access_tokens().await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.get_personal_access_tokens().await, } } @@ -43,9 +47,13 @@ impl PersonalAccessTokenClient for ClientWrapper { ) -> Result { match self { ClientWrapper::Iggy(client) => client.create_personal_access_token(name, expiry).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.create_personal_access_token(name, expiry).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.create_personal_access_token(name, expiry).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.create_personal_access_token(name, expiry).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client.create_personal_access_token(name, expiry).await } @@ -55,9 +63,13 @@ impl PersonalAccessTokenClient for ClientWrapper { async fn delete_personal_access_token(&self, name: &str) -> Result<(), IggyError> { match self { ClientWrapper::Iggy(client) => client.delete_personal_access_token(name).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.delete_personal_access_token(name).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.delete_personal_access_token(name).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.delete_personal_access_token(name).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.delete_personal_access_token(name).await, } } @@ -68,9 +80,13 @@ impl PersonalAccessTokenClient for ClientWrapper { ) -> Result { match self { ClientWrapper::Iggy(client) => client.login_with_personal_access_token(token).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.login_with_personal_access_token(token).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.login_with_personal_access_token(token).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.login_with_personal_access_token(token).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client.login_with_personal_access_token(token).await } diff --git a/core/sdk/src/client_wrappers/binary_segment_client.rs b/core/sdk/src/client_wrappers/binary_segment_client.rs index f0e3b8cff5..8f469974a9 100644 --- a/core/sdk/src/client_wrappers/binary_segment_client.rs +++ b/core/sdk/src/client_wrappers/binary_segment_client.rs @@ -36,21 +36,25 @@ impl SegmentClient for ClientWrapper { .delete_segments(stream_id, topic_id, partition_id, segments_count) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .delete_segments(stream_id, topic_id, partition_id, segments_count) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .delete_segments(stream_id, topic_id, partition_id, segments_count) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .delete_segments(stream_id, topic_id, partition_id, segments_count) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .delete_segments(stream_id, topic_id, partition_id, segments_count) diff --git a/core/sdk/src/client_wrappers/binary_stream_client.rs b/core/sdk/src/client_wrappers/binary_stream_client.rs index a14d26c0a6..4675e35e2d 100644 --- a/core/sdk/src/client_wrappers/binary_stream_client.rs +++ b/core/sdk/src/client_wrappers/binary_stream_client.rs @@ -26,9 +26,13 @@ impl StreamClient for ClientWrapper { async fn get_stream(&self, stream_id: &Identifier) -> Result, IggyError> { match self { ClientWrapper::Iggy(client) => client.get_stream(stream_id).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.get_stream(stream_id).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.get_stream(stream_id).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.get_stream(stream_id).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.get_stream(stream_id).await, } } @@ -36,9 +40,13 @@ impl StreamClient for ClientWrapper { async fn get_streams(&self) -> Result, IggyError> { match self { ClientWrapper::Iggy(client) => client.get_streams().await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.get_streams().await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.get_streams().await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.get_streams().await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.get_streams().await, } } @@ -46,9 +54,13 @@ impl StreamClient for ClientWrapper { async fn create_stream(&self, name: &str) -> Result { match self { ClientWrapper::Iggy(client) => client.create_stream(name).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.create_stream(name).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.create_stream(name).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.create_stream(name).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.create_stream(name).await, } } @@ -56,9 +68,13 @@ impl StreamClient for ClientWrapper { async fn update_stream(&self, stream_id: &Identifier, name: &str) -> Result<(), IggyError> { match self { ClientWrapper::Iggy(client) => client.update_stream(stream_id, name).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.update_stream(stream_id, name).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.update_stream(stream_id, name).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.update_stream(stream_id, name).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.update_stream(stream_id, name).await, } } @@ -66,9 +82,13 @@ impl StreamClient for ClientWrapper { async fn delete_stream(&self, stream_id: &Identifier) -> Result<(), IggyError> { match self { ClientWrapper::Iggy(client) => client.delete_stream(stream_id).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.delete_stream(stream_id).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.delete_stream(stream_id).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.delete_stream(stream_id).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.delete_stream(stream_id).await, } } @@ -76,9 +96,13 @@ impl StreamClient for ClientWrapper { async fn purge_stream(&self, stream_id: &Identifier) -> Result<(), IggyError> { match self { ClientWrapper::Iggy(client) => client.purge_stream(stream_id).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.purge_stream(stream_id).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.purge_stream(stream_id).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.purge_stream(stream_id).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.purge_stream(stream_id).await, } } diff --git a/core/sdk/src/client_wrappers/binary_system_client.rs b/core/sdk/src/client_wrappers/binary_system_client.rs index 39363b7266..08f4207120 100644 --- a/core/sdk/src/client_wrappers/binary_system_client.rs +++ b/core/sdk/src/client_wrappers/binary_system_client.rs @@ -29,9 +29,13 @@ impl SystemClient for ClientWrapper { async fn get_stats(&self) -> Result { match self { ClientWrapper::Iggy(client) => client.get_stats().await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.get_stats().await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.get_stats().await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.get_stats().await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.get_stats().await, } } @@ -39,9 +43,13 @@ impl SystemClient for ClientWrapper { async fn get_me(&self) -> Result { match self { ClientWrapper::Iggy(client) => client.get_me().await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.get_me().await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.get_me().await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.get_me().await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.get_me().await, } } @@ -49,9 +57,13 @@ impl SystemClient for ClientWrapper { async fn get_client(&self, client_id: u32) -> Result, IggyError> { match self { ClientWrapper::Iggy(client) => client.get_client(client_id).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.get_client(client_id).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.get_client(client_id).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.get_client(client_id).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.get_client(client_id).await, } } @@ -59,9 +71,13 @@ impl SystemClient for ClientWrapper { async fn get_clients(&self) -> Result, IggyError> { match self { ClientWrapper::Iggy(client) => client.get_clients().await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.get_clients().await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.get_clients().await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.get_clients().await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.get_clients().await, } } @@ -69,9 +85,13 @@ impl SystemClient for ClientWrapper { async fn ping(&self) -> Result<(), IggyError> { match self { ClientWrapper::Iggy(client) => client.ping().await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.ping().await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.ping().await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.ping().await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.ping().await, } } @@ -79,9 +99,13 @@ impl SystemClient for ClientWrapper { async fn heartbeat_interval(&self) -> IggyDuration { match self { ClientWrapper::Iggy(client) => client.heartbeat_interval().await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.heartbeat_interval().await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.heartbeat_interval().await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.heartbeat_interval().await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.heartbeat_interval().await, } } @@ -93,9 +117,13 @@ impl SystemClient for ClientWrapper { ) -> Result { match self { ClientWrapper::Iggy(client) => client.snapshot(compression, snapshot_types).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.snapshot(compression, snapshot_types).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.snapshot(compression, snapshot_types).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.snapshot(compression, snapshot_types).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.snapshot(compression, snapshot_types).await, } } diff --git a/core/sdk/src/client_wrappers/binary_topic_client.rs b/core/sdk/src/client_wrappers/binary_topic_client.rs index 9c8307fe09..4ba31102e3 100644 --- a/core/sdk/src/client_wrappers/binary_topic_client.rs +++ b/core/sdk/src/client_wrappers/binary_topic_client.rs @@ -32,9 +32,13 @@ impl TopicClient for ClientWrapper { ) -> Result, IggyError> { match self { ClientWrapper::Iggy(client) => client.get_topic(stream_id, topic_id).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.get_topic(stream_id, topic_id).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.get_topic(stream_id, topic_id).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.get_topic(stream_id, topic_id).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.get_topic(stream_id, topic_id).await, } } @@ -42,9 +46,13 @@ impl TopicClient for ClientWrapper { async fn get_topics(&self, stream_id: &Identifier) -> Result, IggyError> { match self { ClientWrapper::Iggy(client) => client.get_topics(stream_id).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.get_topics(stream_id).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.get_topics(stream_id).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.get_topics(stream_id).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.get_topics(stream_id).await, } } @@ -73,6 +81,7 @@ impl TopicClient for ClientWrapper { ) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .create_topic( @@ -86,6 +95,7 @@ impl TopicClient for ClientWrapper { ) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .create_topic( @@ -99,6 +109,7 @@ impl TopicClient for ClientWrapper { ) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .create_topic( @@ -112,6 +123,7 @@ impl TopicClient for ClientWrapper { ) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .create_topic( @@ -152,6 +164,7 @@ impl TopicClient for ClientWrapper { ) .await } + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .update_topic( @@ -165,6 +178,7 @@ impl TopicClient for ClientWrapper { ) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .update_topic( @@ -178,6 +192,7 @@ impl TopicClient for ClientWrapper { ) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .update_topic( @@ -191,6 +206,7 @@ impl TopicClient for ClientWrapper { ) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .update_topic( @@ -214,9 +230,13 @@ impl TopicClient for ClientWrapper { ) -> Result<(), IggyError> { match self { ClientWrapper::Iggy(client) => client.delete_topic(stream_id, topic_id).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.delete_topic(stream_id, topic_id).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.delete_topic(stream_id, topic_id).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.delete_topic(stream_id, topic_id).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.delete_topic(stream_id, topic_id).await, } } @@ -228,9 +248,13 @@ impl TopicClient for ClientWrapper { ) -> Result<(), IggyError> { match self { ClientWrapper::Iggy(client) => client.purge_topic(stream_id, topic_id).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.purge_topic(stream_id, topic_id).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.purge_topic(stream_id, topic_id).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.purge_topic(stream_id, topic_id).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.purge_topic(stream_id, topic_id).await, } } diff --git a/core/sdk/src/client_wrappers/binary_user_client.rs b/core/sdk/src/client_wrappers/binary_user_client.rs index a51234f892..d0d3f052bb 100644 --- a/core/sdk/src/client_wrappers/binary_user_client.rs +++ b/core/sdk/src/client_wrappers/binary_user_client.rs @@ -28,9 +28,13 @@ impl UserClient for ClientWrapper { async fn get_user(&self, user_id: &Identifier) -> Result, IggyError> { match self { ClientWrapper::Iggy(client) => client.get_user(user_id).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.get_user(user_id).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.get_user(user_id).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.get_user(user_id).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.get_user(user_id).await, } } @@ -38,9 +42,13 @@ impl UserClient for ClientWrapper { async fn get_users(&self) -> Result, IggyError> { match self { ClientWrapper::Iggy(client) => client.get_users().await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.get_users().await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.get_users().await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.get_users().await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.get_users().await, } } @@ -53,16 +61,19 @@ impl UserClient for ClientWrapper { permissions: Option, ) -> Result { match self { + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .create_user(username, password, status, permissions) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .create_user(username, password, status, permissions) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .create_user(username, password, status, permissions) @@ -73,6 +84,7 @@ impl UserClient for ClientWrapper { .create_user(username, password, status, permissions) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .create_user(username, password, status, permissions) @@ -83,10 +95,14 @@ impl UserClient for ClientWrapper { async fn delete_user(&self, user_id: &Identifier) -> Result<(), IggyError> { match self { + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.delete_user(user_id).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.delete_user(user_id).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.delete_user(user_id).await, ClientWrapper::Iggy(client) => client.delete_user(user_id).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.delete_user(user_id).await, } } @@ -98,10 +114,14 @@ impl UserClient for ClientWrapper { status: Option, ) -> Result<(), IggyError> { match self { + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.update_user(user_id, username, status).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.update_user(user_id, username, status).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.update_user(user_id, username, status).await, ClientWrapper::Iggy(client) => client.update_user(user_id, username, status).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.update_user(user_id, username, status).await, } } @@ -113,9 +133,13 @@ impl UserClient for ClientWrapper { ) -> Result<(), IggyError> { match self { ClientWrapper::Iggy(client) => client.update_permissions(user_id, permissions).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.update_permissions(user_id, permissions).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.update_permissions(user_id, permissions).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.update_permissions(user_id, permissions).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client.update_permissions(user_id, permissions).await } @@ -129,16 +153,19 @@ impl UserClient for ClientWrapper { new_password: &str, ) -> Result<(), IggyError> { match self { + #[cfg(feature = "http")] ClientWrapper::Http(client) => { client .change_password(user_id, current_password, new_password) .await } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => { client .change_password(user_id, current_password, new_password) .await } + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => { client .change_password(user_id, current_password, new_password) @@ -149,6 +176,7 @@ impl UserClient for ClientWrapper { .change_password(user_id, current_password, new_password) .await } + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => { client .change_password(user_id, current_password, new_password) @@ -160,9 +188,13 @@ impl UserClient for ClientWrapper { async fn login_user(&self, username: &str, password: &str) -> Result { match self { ClientWrapper::Iggy(client) => client.login_user(username, password).await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.login_user(username, password).await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.login_user(username, password).await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.login_user(username, password).await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.login_user(username, password).await, } } @@ -170,9 +202,13 @@ impl UserClient for ClientWrapper { async fn logout_user(&self) -> Result<(), IggyError> { match self { ClientWrapper::Iggy(client) => client.logout_user().await, + #[cfg(feature = "http")] ClientWrapper::Http(client) => client.logout_user().await, + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => client.logout_user().await, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => client.logout_user().await, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => client.logout_user().await, } } diff --git a/core/sdk/src/client_wrappers/client_wrapper.rs b/core/sdk/src/client_wrappers/client_wrapper.rs index 669b2545b7..69bcb08f37 100644 --- a/core/sdk/src/client_wrappers/client_wrapper.rs +++ b/core/sdk/src/client_wrappers/client_wrapper.rs @@ -17,17 +17,25 @@ */ use crate::clients::client::IggyClient; +#[cfg(feature = "http")] use crate::http::http_client::HttpClient; +#[cfg(feature = "quic")] use crate::quic::quic_client::QuicClient; +#[cfg(feature = "tcp")] use crate::tcp::tcp_client::TcpClient; +#[cfg(feature = "websocket")] use crate::websocket::websocket_client::WebSocketClient; #[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum ClientWrapper { Iggy(IggyClient), + #[cfg(feature = "http")] Http(HttpClient), + #[cfg(feature = "tcp")] Tcp(TcpClient), + #[cfg(feature = "quic")] Quic(QuicClient), + #[cfg(feature = "websocket")] WebSocket(WebSocketClient), } diff --git a/core/sdk/src/client_wrappers/connection_info.rs b/core/sdk/src/client_wrappers/connection_info.rs index 44720e05a5..6a495a092a 100644 --- a/core/sdk/src/client_wrappers/connection_info.rs +++ b/core/sdk/src/client_wrappers/connection_info.rs @@ -40,18 +40,22 @@ impl ClientWrapper { server_address: String::from("unknown"), } } + #[cfg(feature = "tcp")] ClientWrapper::Tcp(client) => ConnectionInfo { protocol: TransportProtocol::Tcp, server_address: client.current_server_address.lock().await.clone(), }, + #[cfg(feature = "quic")] ClientWrapper::Quic(client) => ConnectionInfo { protocol: TransportProtocol::Quic, server_address: client.current_server_address.lock().await.clone(), }, + #[cfg(feature = "http")] ClientWrapper::Http(client) => ConnectionInfo { protocol: TransportProtocol::Http, server_address: client.api_url.to_string(), }, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(client) => ConnectionInfo { protocol: TransportProtocol::WebSocket, server_address: client.current_server_address.lock().await.clone(), diff --git a/core/sdk/src/clients/binary_personal_access_tokens.rs b/core/sdk/src/clients/binary_personal_access_tokens.rs index 91e4ba7600..c8882e13b0 100644 --- a/core/sdk/src/clients/binary_personal_access_tokens.rs +++ b/core/sdk/src/clients/binary_personal_access_tokens.rs @@ -16,7 +16,9 @@ * under the License. */ -use crate::prelude::{ClientWrapper, IggyClient}; +#[cfg(any(feature = "tcp", feature = "quic", feature = "websocket"))] +use crate::prelude::ClientWrapper; +use crate::prelude::IggyClient; use async_trait::async_trait; use iggy_binary_protocol::{Client, PersonalAccessTokenClient}; use iggy_common::locking::IggyRwLockFn; @@ -66,8 +68,11 @@ impl PersonalAccessTokenClient for IggyClient { let should_redirect = { let client = self.client.read().await; match &*client { + #[cfg(feature = "tcp")] ClientWrapper::Tcp(tcp_client) => tcp_client.handle_leader_redirection().await?, + #[cfg(feature = "quic")] ClientWrapper::Quic(quic_client) => quic_client.handle_leader_redirection().await?, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(ws_client) => { ws_client.handle_leader_redirection().await? } diff --git a/core/sdk/src/clients/binary_users.rs b/core/sdk/src/clients/binary_users.rs index b1b4db86fb..b4c44dbe4d 100644 --- a/core/sdk/src/clients/binary_users.rs +++ b/core/sdk/src/clients/binary_users.rs @@ -16,6 +16,7 @@ * under the License. */ +#[cfg(any(feature = "tcp", feature = "quic", feature = "websocket"))] use crate::client_wrappers::client_wrapper::ClientWrapper; use crate::prelude::IggyClient; use async_trait::async_trait; @@ -103,8 +104,11 @@ impl UserClient for IggyClient { let should_redirect = { let client = self.client.read().await; match &*client { + #[cfg(feature = "tcp")] ClientWrapper::Tcp(tcp_client) => tcp_client.handle_leader_redirection().await?, + #[cfg(feature = "quic")] ClientWrapper::Quic(quic_client) => quic_client.handle_leader_redirection().await?, + #[cfg(feature = "websocket")] ClientWrapper::WebSocket(ws_client) => { ws_client.handle_leader_redirection().await? } diff --git a/core/sdk/src/clients/client.rs b/core/sdk/src/clients/client.rs index b2dd196894..cda48e65f1 100644 --- a/core/sdk/src/clients/client.rs +++ b/core/sdk/src/clients/client.rs @@ -19,13 +19,17 @@ use crate::client_wrappers::client_wrapper::ClientWrapper; use crate::client_wrappers::connection_info::ConnectionInfo; use crate::clients::client_builder::IggyClientBuilder; +#[cfg(feature = "http")] use crate::http::http_client::HttpClient; use crate::prelude::EncryptorKind; use crate::prelude::IggyConsumerBuilder; use crate::prelude::IggyError; use crate::prelude::IggyProducerBuilder; +#[cfg(feature = "quic")] use crate::quic::quic_client::QuicClient; +#[cfg(feature = "tcp")] use crate::tcp::tcp_client::TcpClient; +#[cfg(feature = "websocket")] use crate::websocket::websocket_client::WebSocketClient; use async_broadcast::Receiver; use async_trait::async_trait; @@ -37,7 +41,7 @@ use std::fmt::Debug; use std::sync::Arc; use tokio::spawn; use tokio::time::sleep; -use tracing::log::warn; +use tracing::warn; use tracing::{debug, error, info}; /// The main client struct which implements all the `Client` traits and wraps the underlying low-level client for the specific transport. @@ -51,6 +55,7 @@ pub struct IggyClient { pub(crate) encryptor: Option>, } +#[cfg(feature = "tcp")] impl Default for IggyClient { fn default() -> Self { IggyClient::new(ClientWrapper::Tcp(TcpClient::default())) @@ -83,18 +88,24 @@ impl IggyClient { /// Creates a new `IggyClient` from the provided connection string. pub fn from_connection_string(connection_string: &str) -> Result { match ConnectionStringUtils::parse_protocol(connection_string)? { + #[cfg(feature = "tcp")] TransportProtocol::Tcp => Ok(IggyClient::new(ClientWrapper::Tcp( TcpClient::from_connection_string(connection_string)?, ))), + #[cfg(feature = "quic")] TransportProtocol::Quic => Ok(IggyClient::new(ClientWrapper::Quic( QuicClient::from_connection_string(connection_string)?, ))), + #[cfg(feature = "http")] TransportProtocol::Http => Ok(IggyClient::new(ClientWrapper::Http( HttpClient::from_connection_string(connection_string)?, ))), + #[cfg(feature = "websocket")] TransportProtocol::WebSocket => Ok(IggyClient::new(ClientWrapper::WebSocket( WebSocketClient::from_connection_string(connection_string)?, ))), + #[allow(unreachable_patterns)] + _ => Err(IggyError::InvalidCommand), } } @@ -312,6 +323,7 @@ mod tests { assert!(client.is_err()); } + #[cfg(feature = "tcp")] #[test] fn should_succeed_with_default_prefix() { let default_connection_string_prefix = "iggy://"; @@ -326,6 +338,7 @@ mod tests { assert!(client.is_ok()); } + #[cfg(feature = "tcp")] #[test] fn should_succeed_with_tcp_protocol() { let connection_string_prefix = "iggy+"; @@ -341,6 +354,7 @@ mod tests { assert!(client.is_ok()); } + #[cfg(feature = "tcp")] #[test] fn should_succeed_with_tcp_protocol_using_pat() { let connection_string_prefix = "iggy+"; @@ -353,6 +367,7 @@ mod tests { assert!(client.is_ok()); } + #[cfg(feature = "quic")] #[tokio::test] async fn should_succeed_with_quic_protocol() { let connection_string_prefix = "iggy+"; @@ -368,6 +383,7 @@ mod tests { assert!(client.is_ok()); } + #[cfg(feature = "quic")] #[tokio::test] async fn should_succeed_with_quic_protocol_using_pat() { let connection_string_prefix = "iggy+"; @@ -380,6 +396,7 @@ mod tests { assert!(client.is_ok()); } + #[cfg(feature = "http")] #[test] fn should_succeed_with_http_protocol() { let connection_string_prefix = "iggy+"; @@ -395,6 +412,7 @@ mod tests { assert!(client.is_ok()); } + #[cfg(feature = "http")] #[test] fn should_succeed_with_http_protocol_with_pat() { let connection_string_prefix = "iggy+"; diff --git a/core/sdk/src/clients/client_builder.rs b/core/sdk/src/clients/client_builder.rs index 9600f6b387..ffc9b4f5ea 100644 --- a/core/sdk/src/clients/client_builder.rs +++ b/core/sdk/src/clients/client_builder.rs @@ -18,13 +18,24 @@ use crate::client_wrappers::client_wrapper::ClientWrapper; use crate::clients::client::IggyClient; +#[cfg(feature = "http")] use crate::http::http_client::HttpClient; -use crate::prelude::{ - AutoLogin, EncryptorKind, HttpClientConfigBuilder, IggyDuration, IggyError, Partitioner, - QuicClientConfigBuilder, TcpClientConfigBuilder, WebSocketClientConfigBuilder, -}; +#[cfg(feature = "http")] +use crate::prelude::HttpClientConfigBuilder; +#[cfg(feature = "quic")] +use crate::prelude::QuicClientConfigBuilder; +#[cfg(feature = "tcp")] +use crate::prelude::TcpClientConfigBuilder; +#[cfg(feature = "websocket")] +use crate::prelude::WebSocketClientConfigBuilder; +#[cfg(any(feature = "tcp", feature = "quic", feature = "websocket"))] +use crate::prelude::{AutoLogin, IggyDuration}; +use crate::prelude::{EncryptorKind, IggyError, Partitioner}; +#[cfg(feature = "quic")] use crate::quic::quic_client::QuicClient; +#[cfg(feature = "tcp")] use crate::tcp::tcp_client::TcpClient; +#[cfg(feature = "websocket")] use crate::websocket::websocket_client::WebSocketClient; use iggy_common::{ConnectionStringUtils, TransportProtocol}; use std::sync::Arc; @@ -50,26 +61,32 @@ impl IggyClientBuilder { let mut builder = Self::new(); match ConnectionStringUtils::parse_protocol(connection_string)? { + #[cfg(feature = "tcp")] TransportProtocol::Tcp => { builder.client = Some(ClientWrapper::Tcp(TcpClient::from_connection_string( connection_string, )?)); } + #[cfg(feature = "quic")] TransportProtocol::Quic => { builder.client = Some(ClientWrapper::Quic(QuicClient::from_connection_string( connection_string, )?)); } + #[cfg(feature = "http")] TransportProtocol::Http => { builder.client = Some(ClientWrapper::Http(HttpClient::from_connection_string( connection_string, )?)); } + #[cfg(feature = "websocket")] TransportProtocol::WebSocket => { builder.client = Some(ClientWrapper::WebSocket( WebSocketClient::from_connection_string(connection_string)?, )); } + #[allow(unreachable_patterns)] + _ => return Err(IggyError::InvalidCommand), } Ok(builder) @@ -96,6 +113,7 @@ impl IggyClientBuilder { /// This method provides fluent API for the TCP client configuration. /// It returns the `TcpClientBuilder` instance, which allows to configure the TCP client with custom settings or using defaults. /// This should be called after the non-protocol specific methods, such as `with_partitioner`, `with_encryptor` or `with_message_handler`. + #[cfg(feature = "tcp")] pub fn with_tcp(self) -> TcpClientBuilder { TcpClientBuilder { config: TcpClientConfigBuilder::default(), @@ -106,6 +124,7 @@ impl IggyClientBuilder { /// This method provides fluent API for the QUIC client configuration. /// It returns the `QuicClientBuilder` instance, which allows to configure the QUIC client with custom settings or using defaults. /// This should be called after the non-protocol specific methods, such as `with_partitioner`, `with_encryptor` or `with_message_handler`. + #[cfg(feature = "quic")] pub fn with_quic(self) -> QuicClientBuilder { QuicClientBuilder { config: QuicClientConfigBuilder::default(), @@ -116,6 +135,7 @@ impl IggyClientBuilder { /// This method provides fluent API for the HTTP client configuration. /// It returns the `HttpClientBuilder` instance, which allows to configure the HTTP client with custom settings or using defaults. /// This should be called after the non-protocol specific methods, such as `with_partitioner`, `with_encryptor` or `with_message_handler`. + #[cfg(feature = "http")] pub fn with_http(self) -> HttpClientBuilder { HttpClientBuilder { config: HttpClientConfigBuilder::default(), @@ -126,6 +146,7 @@ impl IggyClientBuilder { /// This method provides fluent API for the WebSocket client configuration. /// It returns the `WebSocketClientBuilder` instance, which allows to configure the WebSocket client with custom settings or using defaults. /// This should be called after the non-protocol specific methods, such as `with_partitioner`, `with_encryptor` or `with_message_handler`. + #[cfg(feature = "websocket")] pub fn with_websocket(self) -> WebSocketClientBuilder { WebSocketClientBuilder { config: WebSocketClientConfigBuilder::default(), @@ -147,12 +168,14 @@ impl IggyClientBuilder { } } +#[cfg(feature = "tcp")] #[derive(Debug, Default)] pub struct TcpClientBuilder { config: TcpClientConfigBuilder, parent_builder: IggyClientBuilder, } +#[cfg(feature = "tcp")] impl TcpClientBuilder { /// Sets the server address for the TCP client. pub fn with_server_address(mut self, server_address: String) -> Self { @@ -231,12 +254,14 @@ impl TcpClientBuilder { } } +#[cfg(feature = "quic")] #[derive(Debug, Default)] pub struct QuicClientBuilder { config: QuicClientConfigBuilder, parent_builder: IggyClientBuilder, } +#[cfg(feature = "quic")] impl QuicClientBuilder { /// Sets the server address for the QUIC client. pub fn with_server_address(mut self, server_address: String) -> Self { @@ -289,12 +314,14 @@ impl QuicClientBuilder { } } +#[cfg(feature = "http")] #[derive(Debug, Default)] pub struct HttpClientBuilder { config: HttpClientConfigBuilder, parent_builder: IggyClientBuilder, } +#[cfg(feature = "http")] impl HttpClientBuilder { /// Sets the server address for the HTTP client. pub fn with_api_url(mut self, api_url: String) -> Self { @@ -319,11 +346,13 @@ impl HttpClientBuilder { } } +#[cfg(feature = "websocket")] pub struct WebSocketClientBuilder { config: WebSocketClientConfigBuilder, parent_builder: IggyClientBuilder, } +#[cfg(feature = "websocket")] impl WebSocketClientBuilder { /// Sets the server address for the WebSocket client. pub fn with_server_address(mut self, server_address: String) -> Self { @@ -482,6 +511,7 @@ mod tests { assert!(client_builder.is_err()); } + #[cfg(feature = "tcp")] #[test] fn should_succeed_with_default_prefix() { let default_connection_string_prefix = "iggy://"; @@ -496,6 +526,7 @@ mod tests { assert!(client_builder.is_ok()); } + #[cfg(feature = "tcp")] #[test] fn should_succeed_with_tcp_protocol() { let connection_string_prefix = "iggy+"; @@ -511,6 +542,7 @@ mod tests { assert!(client_builder.is_ok()); } + #[cfg(feature = "tcp")] #[test] fn should_succeed_with_tcp_protocol_using_pat() { let connection_string_prefix = "iggy+"; @@ -523,6 +555,7 @@ mod tests { assert!(client_builder.is_ok()); } + #[cfg(feature = "quic")] #[tokio::test] async fn should_succeed_with_quic_protocol() { let connection_string_prefix = "iggy+"; @@ -538,6 +571,7 @@ mod tests { assert!(client_builder.is_ok()); } + #[cfg(feature = "quic")] #[tokio::test] async fn should_succeed_with_quic_protocol_using_pat() { let connection_string_prefix = "iggy+"; @@ -550,6 +584,7 @@ mod tests { assert!(client_builder.is_ok()); } + #[cfg(feature = "http")] #[test] fn should_succeed_with_http_protocol() { let connection_string_prefix = "iggy+"; @@ -565,6 +600,7 @@ mod tests { assert!(client_builder.is_ok()); } + #[cfg(feature = "http")] #[test] fn should_succeed_with_http_protocol_with_pat() { let connection_string_prefix = "iggy+"; diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs index f36471c1e9..0f59f88550 100644 --- a/core/sdk/src/lib.rs +++ b/core/sdk/src/lib.rs @@ -20,10 +20,17 @@ pub mod client_provider; pub mod client_wrappers; pub mod clients; pub mod consumer_ext; +#[cfg(feature = "http")] pub mod http; +#[cfg(any(feature = "tcp", feature = "quic", feature = "websocket"))] mod leader_aware; pub mod prelude; +#[cfg(feature = "quic")] pub mod quic; pub mod stream_builder; +#[cfg(feature = "tcp")] pub mod tcp; +#[cfg(any(feature = "tcp", feature = "websocket"))] +pub(crate) mod tls; +#[cfg(feature = "websocket")] pub mod websocket; diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs index bcbb44f827..5b0ca31df7 100644 --- a/core/sdk/src/prelude.rs +++ b/core/sdk/src/prelude.rs @@ -46,7 +46,9 @@ pub use crate::stream_builder::IggyConsumerConfig; pub use crate::stream_builder::IggyStreamConsumer; pub use crate::stream_builder::{IggyProducerConfig, IggyStreamProducer}; pub use crate::stream_builder::{IggyStream, IggyStreamConfig}; +#[cfg(feature = "tcp")] pub use crate::tcp::tcp_client::TcpClient; +#[cfg(feature = "websocket")] pub use crate::websocket::websocket_client::WebSocketClient; pub use iggy_binary_protocol::{ Client, ClusterClient, ConsumerGroupClient, ConsumerOffsetClient, MessageClient, diff --git a/core/sdk/src/tcp/tcp_tls_verifier.rs b/core/sdk/src/tcp/tcp_tls_verifier.rs index 5142a56e70..6809cf5c94 100644 --- a/core/sdk/src/tcp/tcp_tls_verifier.rs +++ b/core/sdk/src/tcp/tcp_tls_verifier.rs @@ -16,55 +16,4 @@ * under the License. */ -use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}; -use rustls::pki_types::{CertificateDer, ServerName, UnixTime}; -use rustls::{DigitallySignedStruct, Error, SignatureScheme}; - -#[derive(Debug)] -pub struct NoServerVerification; - -impl ServerCertVerifier for NoServerVerification { - fn verify_server_cert( - &self, - _end_entity: &CertificateDer<'_>, - _intermediates: &[CertificateDer<'_>], - _server_name: &ServerName<'_>, - _ocsp_response: &[u8], - _now: UnixTime, - ) -> Result { - Ok(ServerCertVerified::assertion()) - } - - fn verify_tls12_signature( - &self, - _message: &[u8], - _cert: &CertificateDer<'_>, - _dss: &DigitallySignedStruct, - ) -> Result { - Ok(HandshakeSignatureValid::assertion()) - } - - fn verify_tls13_signature( - &self, - _message: &[u8], - _cert: &CertificateDer<'_>, - _dss: &DigitallySignedStruct, - ) -> Result { - Ok(HandshakeSignatureValid::assertion()) - } - - fn supported_verify_schemes(&self) -> Vec { - vec![ - SignatureScheme::ECDSA_NISTP256_SHA256, - SignatureScheme::ECDSA_NISTP384_SHA384, - SignatureScheme::ECDSA_NISTP521_SHA512, - SignatureScheme::RSA_PSS_SHA256, - SignatureScheme::RSA_PSS_SHA384, - SignatureScheme::RSA_PSS_SHA512, - SignatureScheme::RSA_PKCS1_SHA256, - SignatureScheme::RSA_PKCS1_SHA384, - SignatureScheme::RSA_PKCS1_SHA512, - SignatureScheme::ED25519, - ] - } -} +pub use crate::tls::no_server_verification::NoServerVerification; diff --git a/core/sdk/src/tls/mod.rs b/core/sdk/src/tls/mod.rs new file mode 100644 index 0000000000..1fe87f7043 --- /dev/null +++ b/core/sdk/src/tls/mod.rs @@ -0,0 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +pub(crate) mod no_server_verification; diff --git a/core/sdk/src/tls/no_server_verification.rs b/core/sdk/src/tls/no_server_verification.rs new file mode 100644 index 0000000000..5142a56e70 --- /dev/null +++ b/core/sdk/src/tls/no_server_verification.rs @@ -0,0 +1,70 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}; +use rustls::pki_types::{CertificateDer, ServerName, UnixTime}; +use rustls::{DigitallySignedStruct, Error, SignatureScheme}; + +#[derive(Debug)] +pub struct NoServerVerification; + +impl ServerCertVerifier for NoServerVerification { + fn verify_server_cert( + &self, + _end_entity: &CertificateDer<'_>, + _intermediates: &[CertificateDer<'_>], + _server_name: &ServerName<'_>, + _ocsp_response: &[u8], + _now: UnixTime, + ) -> Result { + Ok(ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> Result { + Ok(HandshakeSignatureValid::assertion()) + } + + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> Result { + Ok(HandshakeSignatureValid::assertion()) + } + + fn supported_verify_schemes(&self) -> Vec { + vec![ + SignatureScheme::ECDSA_NISTP256_SHA256, + SignatureScheme::ECDSA_NISTP384_SHA384, + SignatureScheme::ECDSA_NISTP521_SHA512, + SignatureScheme::RSA_PSS_SHA256, + SignatureScheme::RSA_PSS_SHA384, + SignatureScheme::RSA_PSS_SHA512, + SignatureScheme::RSA_PKCS1_SHA256, + SignatureScheme::RSA_PKCS1_SHA384, + SignatureScheme::RSA_PKCS1_SHA512, + SignatureScheme::ED25519, + ] + } +} diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index c64233dafe..d56c2016a1 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -391,7 +391,7 @@ impl WebSocketClient { .with_no_client_auth() } else { // skip certificate validation (development/self-signed certs) - use crate::tcp::tcp_tls_verifier::NoServerVerification; + use crate::tls::no_server_verification::NoServerVerification; rustls::ClientConfig::builder() .dangerous() .with_custom_certificate_verifier(Arc::new(NoServerVerification)) From 97dc578902dbdb64ef7442a3555a4d63cce93924 Mon Sep 17 00:00:00 2001 From: shin Date: Tue, 3 Mar 2026 03:03:44 +0900 Subject: [PATCH 5/5] chore: sort Cargo.toml dependencies for sdk and server Signed-off-by: shin --- core/sdk/Cargo.toml | 7 ++++++- core/server/Cargo.toml | 8 +++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index 08be860126..42b7f6564b 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -33,7 +33,12 @@ default = ["tokio_lock", "tcp", "quic", "http", "websocket"] tokio_lock = [] tcp = ["dep:tokio-rustls", "dep:rustls", "dep:webpki-roots"] quic = ["dep:quinn", "dep:rustls"] -http = ["dep:reqwest", "dep:reqwest-middleware", "dep:reqwest-retry", "dep:reqwest-tracing"] +http = [ + "dep:reqwest", + "dep:reqwest-middleware", + "dep:reqwest-retry", + "dep:reqwest-tracing", +] websocket = ["dep:tokio-tungstenite", "dep:tungstenite", "dep:rustls", "dep:webpki-roots"] [dependencies] diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 3da89b560d..718d4d61c2 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -38,7 +38,13 @@ mimalloc = ["dep:mimalloc"] iggy-web = ["http", "dep:rust-embed", "dep:mime_guess"] tcp = ["iggy_common/tcp"] quic = ["iggy_common/quic"] -http = ["dep:axum", "dep:axum-server", "dep:cyper-axum", "dep:tower-http", "dep:jsonwebtoken"] +http = [ + "dep:axum", + "dep:axum-server", + "dep:cyper-axum", + "dep:tower-http", + "dep:jsonwebtoken", +] websocket = ["iggy_common/websocket", "configs/websocket", "dep:tungstenite"] [dependencies]