Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion core/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ documentation = "https://iggy.apache.org/docs"
repository = "https://github.com/apache/iggy"
readme = "../../README.md"

[features]
default = ["tcp", "quic", "websocket"]
tcp = []
quic = []
websocket = ["dep:tungstenite"]

[dependencies]
aes-gcm = { workspace = true }
ahash = { workspace = true }
Expand Down Expand Up @@ -60,7 +66,7 @@ strum = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tungstenite = { workspace = true }
tungstenite = { workspace = true, optional = true }
twox-hash = { workspace = true }
ulid = { workspace = true }
uuid = { workspace = true }
Expand Down
10 changes: 7 additions & 3 deletions core/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@ pub use commands::system::*;
pub use commands::topics::*;
pub use commands::users::*;
pub use deduplication::MessageDeduplicator;
pub use sender::{
QuicSender, Sender, SenderKind, TcpSender, TcpTlsSender, WebSocketSender, WebSocketTlsSender,
};
#[cfg(feature = "quic")]
pub use sender::QuicSender;
pub use sender::{Sender, SenderKind};
#[cfg(feature = "tcp")]
pub use sender::{TcpSender, TcpTlsSender};
#[cfg(feature = "websocket")]
pub use sender::{WebSocketSender, WebSocketTlsSender};
pub use traits::bytes_serializable::BytesSerializable;
pub use traits::partitioner::Partitioner;
pub use traits::sizeable::Sizeable;
Expand Down
35 changes: 34 additions & 1 deletion core/common/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,43 @@
* under the License.
*/

#[cfg(not(any(feature = "tcp", feature = "quic", feature = "websocket")))]
compile_error!(
"At least one protocol feature (tcp, quic, websocket) must be enabled for iggy_common"
);

#[cfg(feature = "quic")]
mod quic_sender;
#[cfg(feature = "tcp")]
mod tcp_sender;
#[cfg(feature = "tcp")]
mod tcp_tls_sender;
#[cfg(feature = "websocket")]
mod websocket_sender;
#[cfg(feature = "websocket")]
mod websocket_tls_sender;

#[cfg(feature = "quic")]
pub use quic_sender::QuicSender;
#[cfg(feature = "tcp")]
pub use tcp_sender::TcpSender;
#[cfg(feature = "tcp")]
pub use tcp_tls_sender::TcpTlsSender;
#[cfg(feature = "websocket")]
pub use websocket_sender::WebSocketSender;
#[cfg(feature = "websocket")]
pub use websocket_tls_sender::WebSocketTlsSender;

use crate::IggyError;
use crate::alloc::buffer::PooledBuffer;
use compio::BufResult;
use compio::buf::IoBufMut;
use compio::io::{AsyncReadExt, AsyncWriteExt};
#[cfg(any(feature = "tcp", feature = "websocket"))]
use compio::net::TcpStream;
#[cfg(feature = "quic")]
use compio::quic::{RecvStream, SendStream};
#[cfg(feature = "tcp")]
use compio::tls::TlsStream;
use std::future::Future;
#[cfg(unix)]
Expand All @@ -58,10 +76,15 @@ macro_rules! forward_async_methods {
$(<$($generic $(: $bound)?),+>)?
(&mut self, $( $arg: $arg_ty ),* ) -> $ret {
match self {
#[cfg(feature = "tcp")]
Self::Tcp(d) => d.$method_name$(::<$($generic),+>)?($( $arg ),*).await,
#[cfg(feature = "tcp")]
Self::TcpTls(s) => s.$method_name$(::<$($generic),+>)?($( $arg ),*).await,
#[cfg(feature = "quic")]
Self::Quic(s) => s.$method_name$(::<$($generic),+>)?($( $arg ),*).await,
#[cfg(feature = "websocket")]
Self::WebSocket(s) => s.$method_name$(::<$($generic),+>)?($( $arg ),*).await,
#[cfg(feature = "websocket")]
Self::WebSocketTls(s) => s.$method_name$(::<$($generic),+>)?($( $arg ),*).await,
}
}
Expand All @@ -88,40 +111,50 @@ pub trait Sender {
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum SenderKind {
#[cfg(feature = "tcp")]
Tcp(TcpSender),
#[cfg(feature = "tcp")]
TcpTls(TcpTlsSender),
#[cfg(feature = "quic")]
Quic(QuicSender),
#[cfg(feature = "websocket")]
WebSocket(WebSocketSender),
#[cfg(feature = "websocket")]
WebSocketTls(WebSocketTlsSender),
}

impl SenderKind {
#[cfg(feature = "tcp")]
pub fn get_tcp_sender(stream: TcpStream) -> Self {
Self::Tcp(TcpSender {
stream: Some(stream),
})
}

#[cfg(feature = "tcp")]
pub fn get_tcp_tls_sender(stream: TlsStream<TcpStream>) -> Self {
Self::TcpTls(TcpTlsSender { stream })
}

#[cfg(feature = "quic")]
pub fn get_quic_sender(send_stream: SendStream, recv_stream: RecvStream) -> Self {
Self::Quic(QuicSender {
send: send_stream,
recv: recv_stream,
})
}

#[cfg(feature = "websocket")]
pub fn get_websocket_sender(stream: WebSocketSender) -> Self {
Self::WebSocket(stream)
}

#[cfg(feature = "websocket")]
pub fn get_websocket_tls_sender(stream: WebSocketTlsSender) -> Self {
Self::WebSocketTls(stream)
}

#[cfg(unix)]
#[cfg(all(unix, feature = "tcp"))]
pub fn take_and_migrate_tcp(&mut self) -> Option<OwnedFd> {
match self {
SenderKind::Tcp(tcp_sender) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::types::configuration::websocket_config::websocket_connection_string_o
use crate::{AutoLogin, IggyDuration, WebSocketClientReconnectionConfig};
use std::fmt::{Display, Formatter};
use std::str::FromStr;
#[cfg(feature = "websocket")]
use tungstenite::protocol::WebSocketConfig as TungsteniteConfig;

/// Configuration for the WebSocket client.
Expand Down Expand Up @@ -82,20 +83,34 @@ impl Default for WebSocketClientConfig {

impl Default for WebSocketConfig {
fn default() -> Self {
// Use tungstenite defaults
let tungstenite_config = TungsteniteConfig::default();
WebSocketConfig {
read_buffer_size: Some(tungstenite_config.read_buffer_size),
write_buffer_size: Some(tungstenite_config.write_buffer_size),
max_write_buffer_size: Some(tungstenite_config.max_write_buffer_size),
max_message_size: tungstenite_config.max_message_size,
max_frame_size: tungstenite_config.max_frame_size,
accept_unmasked_frames: false,
#[cfg(feature = "websocket")]
{
let tungstenite_config = TungsteniteConfig::default();
WebSocketConfig {
read_buffer_size: Some(tungstenite_config.read_buffer_size),
write_buffer_size: Some(tungstenite_config.write_buffer_size),
max_write_buffer_size: Some(tungstenite_config.max_write_buffer_size),
max_message_size: tungstenite_config.max_message_size,
max_frame_size: tungstenite_config.max_frame_size,
accept_unmasked_frames: false,
}
}
#[cfg(not(feature = "websocket"))]
{
WebSocketConfig {
read_buffer_size: Some(128 * 1024),
write_buffer_size: Some(128 * 1024),
max_write_buffer_size: Some(usize::MAX),
max_message_size: Some(64 << 20),
max_frame_size: Some(16 << 20),
accept_unmasked_frames: false,
}
}
}
}

impl WebSocketConfig {
#[cfg(feature = "websocket")]
/// Convert to tungstenite WebSocketConfig so we can use tungstenite defaults
pub fn to_tungstenite_config(&self) -> TungsteniteConfig {
let mut config = TungsteniteConfig::default();
Expand Down
6 changes: 5 additions & 1 deletion core/configs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ edition = "2024"
license = "Apache-2.0"
publish = false

[features]
default = ["websocket"]
websocket = ["dep:tungstenite"]

[dependencies]
configs_derive = { workspace = true }
derive_more = { workspace = true }
Expand All @@ -35,4 +39,4 @@ serde_with = { workspace = true }
static-toml = { workspace = true }
strum = { workspace = true }
tracing = { workspace = true }
tungstenite = { workspace = true }
tungstenite = { workspace = true, optional = true }
2 changes: 2 additions & 0 deletions core/configs/src/server_config/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use configs::ConfigEnv;
use iggy_common::IggyByteSize;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
#[cfg(feature = "websocket")]
use tungstenite::protocol::WebSocketConfig as TungsteniteConfig;

#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)]
Expand Down Expand Up @@ -51,6 +52,7 @@ pub struct WebSocketTlsConfig {
}

impl WebSocketConfig {
#[cfg(feature = "websocket")]
pub fn to_tungstenite_config(&self) -> TungsteniteConfig {
let mut config = TungsteniteConfig::default();

Expand Down
33 changes: 23 additions & 10 deletions core/sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ documentation = "https://iggy.apache.org/docs"
repository = "https://github.com/apache/iggy"
readme = "../../README.md"

[features]
default = ["tokio_lock", "tcp", "quic", "http", "websocket"]
tokio_lock = []
tcp = ["dep:tokio-rustls", "dep:rustls", "dep:webpki-roots"]
quic = ["dep:quinn", "dep:rustls"]
http = [
"dep:reqwest",
"dep:reqwest-middleware",
"dep:reqwest-retry",
"dep:reqwest-tracing",
]
websocket = ["dep:tokio-tungstenite", "dep:tungstenite", "dep:rustls", "dep:webpki-roots"]

[dependencies]
async-broadcast = { workspace = true }
async-dropper = { workspace = true }
Expand All @@ -40,20 +53,20 @@ futures = { workspace = true }
futures-util = { workspace = true }
iggy_binary_protocol = { workspace = true }
iggy_common = { workspace = true }
quinn = { workspace = true }
reqwest = { workspace = true }
reqwest-middleware = { workspace = true }
reqwest-retry = { workspace = true }
reqwest-tracing = { workspace = true }
rustls = { workspace = true }
quinn = { workspace = true, optional = true }
reqwest = { workspace = true, optional = true }
reqwest-middleware = { workspace = true, optional = true }
reqwest-retry = { workspace = true, optional = true }
reqwest-tracing = { workspace = true, optional = true }
rustls = { workspace = true, optional = true }
serde = { workspace = true }
tokio = { workspace = true }
tokio-rustls = { workspace = true }
tokio-tungstenite = { workspace = true }
tokio-rustls = { workspace = true, optional = true }
tokio-tungstenite = { workspace = true, optional = true }
tracing = { workspace = true }
trait-variant = { workspace = true }
tungstenite = { workspace = true }
webpki-roots = { workspace = true }
tungstenite = { workspace = true, optional = true }
webpki-roots = { workspace = true, optional = true }

[dev-dependencies]
mockall = { workspace = true }
Loading
Loading