Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/verify-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ jobs:
libcurl4-openssl-dev \
pkg-config \
libsasl2-dev \
protobuf-compiler
protobuf-compiler \
musl-tools
sudo ln -sf /usr/bin/musl-gcc /usr/local/bin/x86_64-linux-musl-gcc

- name: Cache Cargo
uses: Swatinem/rust-cache@v2
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "tim
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9"
serde_json = "1.0"
uuid = { version = "1.0", features = ["v4"] }
uuid = { version = "1.0", features = ["v4", "v7"] }
log = "0.4"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand All @@ -51,6 +51,7 @@ arrow = { version = "55", default-features = false }
arrow-array = "55"
arrow-ipc = "55"
arrow-schema = { version = "55", features = ["serde"] }
parquet = "55"
futures = "0.3"
serde_json_path = "0.7"
xxhash-rust = { version = "0.8", features = ["xxh3"] }
Expand Down Expand Up @@ -78,3 +79,6 @@ governor = "0.8.0"
default = ["incremental-cache", "python"]
incremental-cache = ["wasmtime/incremental-cache"]
python = []

[dev-dependencies]
tempfile = "3.27.0"
86 changes: 74 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,50 @@

APP_NAME := function-stream
VERSION := $(shell grep '^version' Cargo.toml | head -1 | awk -F '"' '{print $$2}')
ARCH := $(shell uname -m)
OS := $(shell uname -s | tr '[:upper:]' '[:lower:]')
DATE := $(shell date -u +"%Y-%m-%dT%H:%M:%SZ")

# 1. Auto-detect system environment & normalize architecture
RAW_ARCH := $(shell uname -m)
# Fix macOS M-series returning arm64 while Rust expects aarch64
ifeq ($(RAW_ARCH), arm64)
ARCH := aarch64
else ifeq ($(RAW_ARCH), amd64)
ARCH := x86_64
else
ARCH := $(RAW_ARCH)
endif

OS := $(shell uname -s | tr '[:upper:]' '[:lower:]')
OS_NAME := $(shell uname -s)

# 2. Configure RUSTFLAGS and target triple per platform
DIST_ROOT := dist
TARGET_DIR := target/release
ifeq ($(OS_NAME), Linux)
# Linux: static-link musl for a truly self-contained, zero-dependency binary
TRIPLE := $(ARCH)-unknown-linux-musl
STATIC_FLAGS := -C target-feature=+crt-static
else ifeq ($(OS_NAME), Darwin)
# macOS: strip symbols but keep dynamic linking (Apple system restriction)
TRIPLE := $(ARCH)-apple-darwin
STATIC_FLAGS :=
else ifneq (,$(findstring MINGW,$(OS_NAME))$(findstring MSYS,$(OS_NAME)))
# Windows (Git Bash / MSYS2): static-link MSVC runtime
TRIPLE := $(ARCH)-pc-windows-msvc
STATIC_FLAGS := -C target-feature=+crt-static
else
# Fallback
TRIPLE := $(ARCH)-unknown-linux-gnu
STATIC_FLAGS :=
endif

# 3. Aggressive optimization flags
# opt-level=z : size-oriented, minimize binary footprint
# strip=symbols: remove debug symbol table at link time
# Note: panic=abort is intentionally omitted to preserve stack unwinding
# for better fault tolerance in the streaming runtime
OPTIMIZE_FLAGS := -C opt-level=z -C strip=symbols $(STATIC_FLAGS)

TARGET_DIR := target/$(TRIPLE)/release
PYTHON_ROOT := python
WASM_SOURCE := $(PYTHON_ROOT)/functionstream-runtime/target/functionstream-python-runtime.wasm

Expand All @@ -42,7 +80,7 @@ C_0 := \033[0m
log = @printf "$(C_B)[-]$(C_0) %-15s %s\n" "$(1)" "$(2)"
success = @printf "$(C_G)[✔]$(C_0) %s\n" "$(1)"

.PHONY: all help build build-lite dist dist-lite clean test env env-clean go-sdk-env go-sdk-build go-sdk-clean docker docker-run docker-push .check-env .build-wasm
.PHONY: all help build build-lite dist dist-lite clean test env env-clean go-sdk-env go-sdk-build go-sdk-clean docker docker-run docker-push .check-env .ensure-target .build-wasm

all: build

Expand All @@ -65,18 +103,42 @@ help:
@echo ""
@echo " Version: $(VERSION) | Arch: $(ARCH) | OS: $(OS)"

build: .check-env .build-wasm
$(call log,BUILD,Rust Full Features)
@cargo build --release --features python --quiet
# 4. Auto-install missing Rust target toolchain
.ensure-target:
@rustup target list --installed | grep -q "$(TRIPLE)" || \
(printf "$(C_Y)[!] Auto-installing target toolchain for $(OS_NAME): $(TRIPLE)$(C_0)\n" && \
rustup target add $(TRIPLE))

# 5. Build targets (depend on .ensure-target for automatic toolchain setup)
build: .check-env .ensure-target .build-wasm
$(call log,BUILD,Rust Full [$(OS_NAME) / $(TRIPLE)])
@RUSTFLAGS="$(OPTIMIZE_FLAGS)" \
cargo build --release \
--target $(TRIPLE) \
--features python \
--quiet
$(call log,BUILD,CLI)
@cargo build --release -p function-stream-cli --quiet
@RUSTFLAGS="$(OPTIMIZE_FLAGS)" \
cargo build --release \
--target $(TRIPLE) \
-p function-stream-cli \
--quiet
$(call success,Target: $(TARGET_DIR)/$(APP_NAME) $(TARGET_DIR)/cli)

build-lite: .check-env
$(call log,BUILD,Rust Lite No Python)
@cargo build --release --no-default-features --features incremental-cache --quiet
build-lite: .check-env .ensure-target
$(call log,BUILD,Rust Lite [$(OS_NAME) / $(TRIPLE)])
@RUSTFLAGS="$(OPTIMIZE_FLAGS)" \
cargo build --release \
--target $(TRIPLE) \
--no-default-features \
--features incremental-cache \
--quiet
$(call log,BUILD,CLI for dist)
@cargo build --release -p function-stream-cli --quiet
@RUSTFLAGS="$(OPTIMIZE_FLAGS)" \
cargo build --release \
--target $(TRIPLE) \
-p function-stream-cli \
--quiet
$(call success,Target: $(TARGET_DIR)/$(APP_NAME) $(TARGET_DIR)/cli)

.build-wasm:
Expand Down
8 changes: 8 additions & 0 deletions protocol/proto/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ message StreamingTableDefinition {
// Stored as opaque bytes to avoid coupling storage schema with runtime API protos.
bytes fs_program_bytes = 3;
string comment = 4;

// User-specified checkpoint interval from WITH clause (e.g. 'checkpoint.interval' = '5000').
// 0 or unset means use system default.
uint64 checkpoint_interval_ms = 5;

// Last globally-committed checkpoint epoch.
// Updated by JobManager after all operators ACK. Used for crash recovery.
uint64 latest_checkpoint_epoch = 6;
}

// =============================================================================
Expand Down
27 changes: 18 additions & 9 deletions src/coordinator/execution/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,25 +322,34 @@ impl PlanVisitor for Executor {
let job_manager: Arc<JobManager> = Arc::clone(&self.job_manager);

let job_id = plan.name.clone();
let job_id = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(job_manager.submit_job(job_id, fs_program.clone()))
})
.map_err(|e| ExecuteError::Internal(format!("Failed to submit streaming job: {e}")))?;

let custom_interval: Option<u64> = plan
.with_options
.as_ref()
.and_then(|opts| opts.get("checkpoint.interval"))
.and_then(|v| v.parse().ok());

self.catalog_manager
.persist_streaming_job(
&plan.name,
&fs_program,
plan.comment.as_deref().unwrap_or(""),
custom_interval.unwrap_or(0),
)
.map_err(|e| {
ExecuteError::Internal(format!(
"Streaming job '{}' submitted but persistence failed: {e}",
plan.name
))
ExecuteError::Internal(format!("Streaming job persistence failed: {e}",))
})?;

let job_id = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(job_manager.submit_job(
job_id,
fs_program,
custom_interval,
None,
))
})
.map_err(|e| ExecuteError::Internal(format!("Failed to submit streaming job: {e}")))?;

info!(
job_id = %job_id,
table = %plan.name,
Expand Down
18 changes: 18 additions & 0 deletions src/coordinator/plan/logical_plan_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,28 @@ impl LogicalPlanVisitor {

let validated_program = self.validate_graph_topology(&final_logical_plan)?;

let streaming_with_options: Option<std::collections::HashMap<String, String>> =
if with_options.is_empty() {
None
} else {
let map: std::collections::HashMap<String, String> = with_options
.iter()
.filter_map(|opt| match opt {
SqlOption::KeyValue { key, value } => Some((
key.value.clone(),
value.to_string().trim_matches('\'').to_string(),
)),
_ => None,
})
.collect();
if map.is_empty() { None } else { Some(map) }
};

Ok(StreamingTable {
name: sink_table_name,
comment: comment.clone(),
program: validated_program,
with_options: streaming_with_options,
})
}

Expand Down
3 changes: 3 additions & 0 deletions src/coordinator/plan/streaming_table_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use super::{PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult};
use crate::sql::logical_node::logical::LogicalProgram;

Expand All @@ -19,6 +21,7 @@ pub struct StreamingTable {
pub name: String,
pub comment: Option<String>,
pub program: LogicalProgram,
pub with_options: Option<HashMap<String, String>>,
}

impl PlanNode for StreamingTable {
Expand Down
28 changes: 28 additions & 0 deletions src/runtime/streaming/api/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

Expand All @@ -19,6 +20,7 @@ use arrow_array::RecordBatch;
use crate::runtime::streaming::memory::MemoryPool;
use crate::runtime::streaming::network::endpoint::PhysicalSender;
use crate::runtime::streaming::protocol::event::{StreamEvent, TrackedEvent};
use crate::runtime::streaming::state::{IoManager, MemoryController};

#[derive(Debug, Clone)]
pub struct TaskContextConfig {
Expand Down Expand Up @@ -61,16 +63,33 @@ pub struct TaskContext {

/// Subtask-level tunables.
config: TaskContextConfig,

/// Root directory for operator state persistence (LSM-Tree data/tombstone files).
pub state_dir: PathBuf,

/// Shared memory controller for state engine back-pressure.
pub memory_controller: Arc<MemoryController>,

/// I/O thread pool handle for background spill/compaction.
pub io_manager: IoManager,

/// Last globally-committed safe epoch for crash recovery.
safe_epoch: u64,
}

impl TaskContext {
#[allow(clippy::too_many_arguments)]
pub fn new(
job_id: String,
pipeline_id: u32,
subtask_index: u32,
parallelism: u32,
downstream_senders: Vec<PhysicalSender>,
memory_pool: Arc<MemoryPool>,
memory_controller: Arc<MemoryController>,
io_manager: IoManager,
state_dir: PathBuf,
safe_epoch: u64,
) -> Self {
let task_name = format!(
"Task-[{}]-Pipe[{}]-Sub[{}/{}]",
Expand All @@ -87,9 +106,18 @@ impl TaskContext {
memory_pool,
current_watermark: None,
config: TaskContextConfig::default(),
state_dir,
memory_controller,
io_manager,
safe_epoch,
}
}

#[inline]
pub fn latest_safe_epoch(&self) -> u64 {
self.safe_epoch
}

#[inline]
pub fn config(&self) -> &TaskContextConfig {
&self.config
Expand Down
Loading
Loading