Skip to content

Commit 174ebaa

Browse files
committed
update
1 parent 9bed7e7 commit 174ebaa

File tree

7 files changed

+346
-302
lines changed

7 files changed

+346
-302
lines changed

protocol/proto/storage.proto

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,30 @@ syntax = "proto3";
1010
package function_stream.storage;
1111

1212
// =============================================================================
13-
// Stream catalog (coordinator stream tables: source / sink)
13+
// Catalog table storage (coordinator SQL catalog)
1414
// =============================================================================
1515

16-
// Top-level persisted record for one stream table.
16+
// Top-level persisted record for one catalog table.
1717
message TableDefinition {
1818
string table_name = 1;
1919
int64 updated_at_millis = 2;
2020
oneof table_type {
21-
StreamSource source = 3;
22-
StreamSink sink = 4;
21+
// Connector-backed ingestion/egress table definition.
22+
CatalogSourceTable connector_table = 3;
23+
// Connector-backed lookup table definition.
24+
CatalogSourceTable lookup_table = 5;
2325
}
2426
}
2527

26-
message StreamSource {
28+
// Shared connector-backed table payload for connector/lookup entries.
29+
message CatalogSourceTable {
2730
bytes arrow_schema_ipc = 1;
2831
optional string event_time_field = 2;
2932
optional string watermark_field = 3;
3033
// Original CREATE TABLE ... WITH ('k'='v', ...) pairs (best-effort; keys sorted in DDL).
3134
map<string, string> with_options = 4;
32-
}
33-
34-
message StreamSink {
35-
bytes arrow_schema_ipc = 1;
36-
bytes logical_program_bincode = 2;
35+
// Canonical connector identifier (e.g. kafka, postgres-cdc).
36+
string connector = 5;
3737
}
3838

3939
// =============================================================================

src/coordinator/dataset/show_catalog_tables_result.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ use std::sync::Arc;
1414

1515
use arrow_array::{Int32Array, StringArray};
1616
use arrow_schema::{DataType, Field, Schema};
17+
use datafusion::arrow::datatypes::Schema as DfSchema;
1718

1819
use super::DataSet;
19-
use crate::sql::schema::{schema_columns_one_line, stream_table_row_detail, StreamTable};
20+
use crate::sql::schema::table::Table as CatalogTable;
21+
use crate::sql::schema::{catalog_table_row_detail, schema_columns_one_line};
2022

2123
#[derive(Clone, Debug)]
2224
pub struct ShowCatalogTablesResult {
@@ -28,25 +30,31 @@ pub struct ShowCatalogTablesResult {
2830
}
2931

3032
impl ShowCatalogTablesResult {
31-
pub fn from_tables(tables: &[Arc<StreamTable>]) -> Self {
33+
pub fn from_tables(tables: &[Arc<CatalogTable>]) -> Self {
3234
let mut names = Vec::with_capacity(tables.len());
3335
let mut kinds = Vec::with_capacity(tables.len());
3436
let mut column_counts = Vec::with_capacity(tables.len());
3537
let mut schema_lines = Vec::with_capacity(tables.len());
3638
let mut details = Vec::with_capacity(tables.len());
3739

3840
for t in tables {
39-
let schema = t.schema();
41+
let schema = match t.as_ref() {
42+
CatalogTable::ConnectorTable(source) | CatalogTable::LookupTable(source) => {
43+
source.produce_physical_schema()
44+
}
45+
CatalogTable::TableFromQuery { .. } => DfSchema::new(t.get_fields()),
46+
};
4047
let ncols = schema.fields().len() as i32;
4148
names.push(t.name().to_string());
4249
kinds.push(match t.as_ref() {
43-
StreamTable::Source { .. } => "SOURCE",
44-
StreamTable::Sink { .. } => "SINK",
50+
CatalogTable::ConnectorTable(_) => "SOURCE",
51+
CatalogTable::LookupTable(_) => "LOOKUP",
52+
CatalogTable::TableFromQuery { .. } => "QUERY",
4553
}
4654
.to_string());
4755
column_counts.push(ncols);
4856
schema_lines.push(schema_columns_one_line(&schema));
49-
details.push(stream_table_row_detail(t.as_ref()));
57+
details.push(catalog_table_row_detail(t.as_ref()));
5058
}
5159

5260
Self {

src/coordinator/execution/executor.rs

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ use crate::coordinator::plan::{
2929
use crate::coordinator::statement::{ConfigSource, FunctionSource};
3030
use crate::runtime::streaming::job::JobManager;
3131
use crate::runtime::taskexecutor::TaskManager;
32-
use crate::sql::schema::{show_create_stream_table, StreamTable};
32+
use crate::sql::schema::table::Table as CatalogTable;
33+
use crate::sql::schema::show_create_catalog_table;
3334
use crate::storage::stream_catalog::CatalogManager;
3435

3536
#[derive(Error, Debug)]
@@ -201,7 +202,10 @@ impl PlanVisitor for Executor {
201202
_plan: &ShowCatalogTablesPlan,
202203
_context: &PlanVisitorContext,
203204
) -> PlanVisitorResult {
204-
let tables = self.catalog_manager.list_stream_tables();
205+
let tables = match self.catalog_manager.list_catalog_tables() {
206+
Ok(tables) => tables,
207+
Err(e) => return PlanVisitorResult::Execute(Err(ExecuteError::Internal(e.to_string()))),
208+
};
205209
let n = tables.len();
206210
let result = ExecuteResult::ok_with_data(
207211
format!("{n} stream catalog table(s)"),
@@ -218,14 +222,15 @@ impl PlanVisitor for Executor {
218222
let execute = || -> Result<ExecuteResult, ExecuteError> {
219223
let t = self
220224
.catalog_manager
221-
.get_stream_table(&plan.table_name)
225+
.get_catalog_table(&plan.table_name)
226+
.map_err(|e| ExecuteError::Internal(e.to_string()))?
222227
.ok_or_else(|| {
223228
ExecuteError::Validation(format!(
224229
"Table '{}' not found in stream catalog",
225230
plan.table_name
226231
))
227232
})?;
228-
let ddl = show_create_stream_table(t.as_ref());
233+
let ddl = show_create_catalog_table(t.as_ref());
229234
Ok(ExecuteResult::ok_with_data(
230235
format!("SHOW CREATE TABLE {}", plan.table_name),
231236
ShowCreateTableResult::new(plan.table_name.clone(), ddl),
@@ -284,21 +289,13 @@ impl PlanVisitor for Executor {
284289
_context: &PlanVisitorContext,
285290
) -> PlanVisitorResult {
286291
let execute = || -> Result<ExecuteResult, ExecuteError> {
287-
let (table_name, if_not_exists, stream_table) = match &plan.body {
292+
let (table_name, if_not_exists, catalog_table) = match &plan.body {
288293
CreateTablePlanBody::ConnectorSource {
289294
source_table,
290295
if_not_exists,
291296
} => {
292297
let table_name = source_table.name().to_string();
293-
let schema = Arc::new(source_table.produce_physical_schema());
294-
let table_instance = StreamTable::Source {
295-
name: table_name.clone(),
296-
connector: source_table.connector().to_string(),
297-
schema,
298-
event_time_field: source_table.event_time_field().map(str::to_string),
299-
watermark_field: source_table.stream_catalog_watermark_field(),
300-
with_options: source_table.catalog_with_options().clone(),
301-
};
298+
let table_instance = CatalogTable::ConnectorTable(source_table.clone());
302299
(table_name, *if_not_exists, table_instance)
303300
}
304301
CreateTablePlanBody::DataFusion(_) => {
@@ -309,14 +306,14 @@ impl PlanVisitor for Executor {
309306
}
310307
};
311308

312-
if if_not_exists && self.catalog_manager.has_stream_table(&table_name) {
309+
if if_not_exists && self.catalog_manager.has_catalog_table(&table_name) {
313310
return Ok(ExecuteResult::ok(format!(
314311
"Table '{table_name}' already exists (skipped)"
315312
)));
316313
}
317314

318315
self.catalog_manager
319-
.add_table(stream_table)
316+
.add_catalog_table(catalog_table)
320317
.map_err(|e| {
321318
ExecuteError::Internal(format!(
322319
"Failed to register connector source table '{}': {}",
@@ -338,15 +335,6 @@ impl PlanVisitor for Executor {
338335
_context: &PlanVisitorContext,
339336
) -> PlanVisitorResult {
340337
let execute = || -> Result<ExecuteResult, ExecuteError> {
341-
let sink = StreamTable::Sink {
342-
name: plan.name.clone(),
343-
program: plan.program.clone(),
344-
};
345-
346-
self.catalog_manager
347-
.add_table(sink)
348-
.map_err(|e| ExecuteError::Internal(e.to_string()))?;
349-
350338
let fs_program: FsProgram = plan.program.clone().into();
351339
let job_manager: Arc<JobManager> = Arc::clone(&self.job_manager);
352340

@@ -359,7 +347,7 @@ impl PlanVisitor for Executor {
359347
info!(
360348
job_id = %job_id,
361349
table = %plan.name,
362-
"Streaming table registered and job submitted"
350+
"Streaming job submitted"
363351
);
364352

365353
Ok(ExecuteResult::ok_with_data(
@@ -398,7 +386,7 @@ impl PlanVisitor for Executor {
398386
) -> PlanVisitorResult {
399387
let execute = || -> Result<ExecuteResult, ExecuteError> {
400388
self.catalog_manager
401-
.drop_table(&plan.table_name, plan.if_exists)
389+
.drop_catalog_table(&plan.table_name, plan.if_exists)
402390
.map_err(|e| ExecuteError::Internal(e.to_string()))?;
403391

404392
Ok(ExecuteResult::ok(format!(

src/coordinator/runtime_context.rs

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,7 @@ use anyhow::Result;
1818

1919
use crate::runtime::streaming::job::JobManager;
2020
use crate::runtime::taskexecutor::TaskManager;
21-
use crate::sql::schema::column_descriptor::ColumnDescriptor;
22-
use crate::sql::schema::connection_type::ConnectionType;
23-
use crate::sql::schema::source_table::SourceTable;
24-
use crate::sql::schema::table::Table as CatalogTable;
25-
use crate::sql::schema::{StreamSchemaProvider, StreamTable};
21+
use crate::sql::schema::StreamSchemaProvider;
2622
use crate::storage::stream_catalog::CatalogManager;
2723

2824
/// Dependencies shared by analyze / plan / execute, analogous to installing globals in
@@ -32,7 +28,6 @@ pub struct CoordinatorRuntimeContext {
3228
pub task_manager: Arc<TaskManager>,
3329
pub catalog_manager: Arc<CatalogManager>,
3430
pub job_manager: Arc<JobManager>,
35-
planning_schema_override: Option<StreamSchemaProvider>,
3631
}
3732

3833
impl CoordinatorRuntimeContext {
@@ -44,61 +39,23 @@ impl CoordinatorRuntimeContext {
4439
.map_err(|e| anyhow::anyhow!("Failed to get CatalogManager: {}", e))?,
4540
job_manager: JobManager::global()
4641
.map_err(|e| anyhow::anyhow!("Failed to get JobManager: {}", e))?,
47-
planning_schema_override: None,
4842
})
4943
}
5044

5145
pub fn new(
5246
task_manager: Arc<TaskManager>,
5347
catalog_manager: Arc<CatalogManager>,
5448
job_manager: Arc<JobManager>,
55-
planning_schema_override: Option<StreamSchemaProvider>,
5649
) -> Self {
5750
Self {
5851
task_manager,
5952
catalog_manager,
6053
job_manager,
61-
planning_schema_override,
6254
}
6355
}
6456

65-
/// Schema provider for [`LogicalPlanVisitor`] / [`SqlToRel`]: override if set, else catalog snapshot.
57+
/// Schema provider for [`LogicalPlanVisitor`] / [`SqlToRel`].
6658
pub fn planning_schema_provider(&self) -> StreamSchemaProvider {
67-
let mut provider = self.catalog_manager.acquire_planning_context();
68-
69-
for (name, stream) in provider.tables.streams.clone() {
70-
let StreamTable::Source {
71-
name: source_name,
72-
connector,
73-
schema,
74-
event_time_field,
75-
watermark_field,
76-
with_options,
77-
} = stream.as_ref()
78-
else {
79-
continue;
80-
};
81-
let mut source = SourceTable::new(
82-
source_name.clone(),
83-
connector.clone(),
84-
ConnectionType::Source,
85-
);
86-
source.schema_specs = schema
87-
.fields()
88-
.iter()
89-
.map(|f| ColumnDescriptor::new_physical((**f).clone()))
90-
.collect();
91-
source.inferred_fields = Some(schema.fields().iter().cloned().collect());
92-
source.temporal_config.event_column = event_time_field.clone();
93-
source.temporal_config.watermark_strategy_column = watermark_field.clone();
94-
source.catalog_with_options = with_options.clone();
95-
96-
provider
97-
.tables
98-
.catalogs
99-
.insert(name, Arc::new(CatalogTable::ConnectorTable(source)));
100-
}
101-
102-
provider
59+
self.catalog_manager.acquire_planning_context()
10360
}
10461
}

src/sql/schema/catalog_ddl.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::collections::BTreeMap;
1717
use datafusion::arrow::datatypes::{DataType, TimeUnit};
1818

1919
use super::schema_provider::StreamTable;
20+
use super::table::Table as CatalogTable;
2021
use crate::sql::logical_node::logical::LogicalProgram;
2122

2223
fn data_type_sql(dt: &DataType) -> String {
@@ -204,3 +205,49 @@ pub fn show_create_stream_table(table: &StreamTable) -> String {
204205
}
205206
}
206207
}
208+
209+
/// Extra fields for `SHOW TABLES` result grid for persisted catalog rows.
210+
pub fn catalog_table_row_detail(table: &CatalogTable) -> String {
211+
match table {
212+
CatalogTable::ConnectorTable(source) => format!(
213+
"kind=connector, connector={}, event_time={:?}, watermark={:?}, with_options={}",
214+
source.connector(),
215+
source.event_time_field(),
216+
source.temporal_config.watermark_strategy_column,
217+
source.catalog_with_options().len()
218+
),
219+
CatalogTable::LookupTable(source) => format!(
220+
"kind=lookup, connector={}, event_time={:?}, watermark={:?}, with_options={}",
221+
source.connector(),
222+
source.event_time_field(),
223+
source.temporal_config.watermark_strategy_column,
224+
source.catalog_with_options().len()
225+
),
226+
CatalogTable::TableFromQuery { .. } => "kind=query".to_string(),
227+
}
228+
}
229+
230+
/// Human-readable `SHOW CREATE TABLE` text for persisted catalog rows.
231+
pub fn show_create_catalog_table(table: &CatalogTable) -> String {
232+
match table {
233+
CatalogTable::ConnectorTable(source) | CatalogTable::LookupTable(source) => {
234+
let schema = source.produce_physical_schema();
235+
let cols = format_columns(&schema);
236+
let mut ddl = format!("CREATE TABLE {} (\n{}\n)", source.name(), cols.join(",\n"));
237+
if let Some(e) = source.event_time_field() {
238+
ddl.push_str(&format!("\n/* EVENT TIME COLUMN: {e} */\n"));
239+
}
240+
if let Some(w) = source.temporal_config.watermark_strategy_column.as_deref() {
241+
ddl.push_str(&format!("/* WATERMARK: {w} */\n"));
242+
}
243+
let mut opts = source.catalog_with_options().clone();
244+
opts.entry("connector".to_string())
245+
.or_insert_with(|| source.connector().to_string());
246+
ddl.push_str(&format_with_clause(&opts));
247+
ddl
248+
}
249+
CatalogTable::TableFromQuery { name, .. } => {
250+
format!("CREATE TABLE {name} AS SELECT ...;\n/* logical query text is not persisted */\n")
251+
}
252+
}
253+
}

src/sql/schema/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ pub mod table_role;
2323
pub mod temporal_pipeline_config;
2424
pub mod utils;
2525

26-
pub use catalog_ddl::{schema_columns_one_line, show_create_stream_table, stream_table_row_detail};
26+
pub use catalog_ddl::{
27+
catalog_table_row_detail, schema_columns_one_line, show_create_catalog_table,
28+
show_create_stream_table, stream_table_row_detail,
29+
};
2730
pub use column_descriptor::ColumnDescriptor;
2831
pub use connection_type::ConnectionType;
2932
pub use source_table::{SourceOperator, SourceTable};

0 commit comments

Comments
 (0)