Skip to content

Commit 561da59

Browse files
committed
update
1 parent 18a19f1 commit 561da59

33 files changed

+1001
-43
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/cli/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ arrow-schema = "52"
1414
comfy-table = "7"
1515
protocol = { path = "../../protocol" }
1616
clap = { version = "4.5", features = ["derive"] }
17-
thiserror = "2"
1817
tokio = { version = "1.0", features = ["full", "signal"] }
1918
tonic = { version = "0.12", features = ["default"] }
2019
rustyline = { version = "14.0", features = ["with-dirs"] }

cli/cli/src/repl.rs

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,62 @@ use comfy_table::{Attribute, Cell, Color, ContentArrangement, Table, TableCompon
2020
use protocol::cli::{function_stream_service_client::FunctionStreamServiceClient, SqlRequest};
2121
use rustyline::error::ReadlineError;
2222
use rustyline::{Config, DefaultEditor, EditMode};
23+
use std::fmt;
2324
use std::io::{self, Cursor, Write};
2425
use std::sync::Arc;
2526
use tokio::sync::Mutex;
2627
use tonic::Request;
2728

28-
#[derive(Debug, thiserror::Error)]
29+
/// CLI errors.
30+
///
31+
/// **Important:** [`tonic::Status`] must not be formatted with `{}` — its [`fmt::Display`] dumps
32+
/// `details` / `metadata` (e.g. HTTP headers). Only [`tonic::Status::message`] is stored in
33+
/// [`ReplError::Rpc`].
34+
#[derive(Debug)]
2935
pub enum ReplError {
30-
#[error("RPC error: {0}")]
31-
Rpc(Box<tonic::Status>),
32-
#[error("Connection failed: {0}")]
36+
Rpc(String),
3337
Connection(String),
34-
#[error("Internal error: {0}")]
3538
Internal(String),
36-
#[error("IO error: {0}")]
37-
Io(#[from] io::Error),
39+
Io(io::Error),
40+
}
41+
42+
impl fmt::Display for ReplError {
43+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44+
match self {
45+
ReplError::Rpc(s) => f.write_str(s),
46+
ReplError::Connection(s) => f.write_str(s),
47+
ReplError::Internal(s) => write!(f, "Internal error: {s}"),
48+
ReplError::Io(e) => write!(f, "IO error: {e}"),
49+
}
50+
}
51+
}
52+
53+
impl std::error::Error for ReplError {
54+
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
55+
match self {
56+
ReplError::Io(e) => Some(e),
57+
_ => None,
58+
}
59+
}
60+
}
61+
62+
impl From<io::Error> for ReplError {
63+
fn from(e: io::Error) -> Self {
64+
ReplError::Io(e)
65+
}
3866
}
3967

4068
impl From<tonic::Status> for ReplError {
4169
fn from(s: tonic::Status) -> Self {
42-
ReplError::Rpc(Box::new(s))
70+
let msg = s.message();
71+
if msg.is_empty() {
72+
ReplError::Rpc(format!(
73+
"gRPC {} (server returned no message)",
74+
s.code()
75+
))
76+
} else {
77+
ReplError::Rpc(msg.to_string())
78+
}
4379
}
4480
}
4581

conf/config.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,10 @@ task_storage:
117117

118118
# Maximum bytes for level base in bytes (optional)
119119
max_bytes_for_level_base: 268435456
120+
121+
# Stream table catalog (SQL: CREATE TABLE connector sources, SHOW TABLES, SHOW CREATE TABLE).
122+
# When persist is true (default), metadata is stored under RocksDB at db_path (default: data/stream_catalog)
123+
# and reloaded after process restart. Set persist: false only for tests/ephemeral nodes.
124+
stream_catalog:
125+
persist: true
126+
# db_path: data/stream_catalog

protocol/proto/storage.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ message StreamSource {
2727
bytes arrow_schema_ipc = 1;
2828
optional string event_time_field = 2;
2929
optional string watermark_field = 3;
30+
// Original CREATE TABLE ... WITH ('k'='v', ...) pairs (best-effort; keys sorted in DDL).
31+
map<string, string> with_options = 4;
3032
}
3133

3234
message StreamSink {

src/config/global_config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ pub struct GlobalConfig {
4040
pub task_storage: crate::config::storage::TaskStorageConfig,
4141
#[serde(default)]
4242
pub streaming: StreamingConfig,
43+
#[serde(default)]
44+
pub stream_catalog: crate::config::storage::StreamCatalogConfig,
4345
}
4446

4547
impl GlobalConfig {

src/config/storage.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,3 +118,27 @@ impl Default for TaskStorageConfig {
118118
}
119119
}
120120
}
121+
122+
/// Stream table catalog (`CREATE TABLE` / `SHOW TABLES`) storage.
123+
#[derive(Debug, Clone, Serialize, Deserialize)]
124+
pub struct StreamCatalogConfig {
125+
/// When `false`, the catalog is in-memory only and is **lost on process restart**.
126+
#[serde(default = "default_stream_catalog_persist")]
127+
pub persist: bool,
128+
/// RocksDB directory for persisted catalog. Default: `{data_dir}/stream_catalog`.
129+
#[serde(default)]
130+
pub db_path: Option<String>,
131+
}
132+
133+
fn default_stream_catalog_persist() -> bool {
134+
true
135+
}
136+
137+
impl Default for StreamCatalogConfig {
138+
fn default() -> Self {
139+
Self {
140+
persist: default_stream_catalog_persist(),
141+
db_path: None,
142+
}
143+
}
144+
}

src/coordinator/analyze/analyzer.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ use super::Analysis;
1414
use crate::coordinator::execution_context::ExecutionContext;
1515
use crate::coordinator::statement::{
1616
CreateFunction, CreatePythonFunction, CreateTable, DropFunction, DropTableStatement,
17-
ShowFunctions, StartFunction, Statement, StatementVisitor, StatementVisitorContext,
18-
StatementVisitorResult, StopFunction, StreamingTableStatement,
17+
ShowCatalogTables, ShowCreateTable, ShowFunctions, StartFunction, Statement,
18+
StatementVisitor, StatementVisitorContext, StatementVisitorResult, StopFunction,
19+
StreamingTableStatement,
1920
};
2021
use std::fmt;
2122

@@ -109,6 +110,22 @@ impl StatementVisitor for Analyzer<'_> {
109110
StatementVisitorResult::Analyze(Box::new(stmt.clone()))
110111
}
111112

113+
fn visit_show_catalog_tables(
114+
&self,
115+
stmt: &ShowCatalogTables,
116+
_context: &StatementVisitorContext,
117+
) -> StatementVisitorResult {
118+
StatementVisitorResult::Analyze(Box::new(stmt.clone()))
119+
}
120+
121+
fn visit_show_create_table(
122+
&self,
123+
stmt: &ShowCreateTable,
124+
_context: &StatementVisitorContext,
125+
) -> StatementVisitorResult {
126+
StatementVisitorResult::Analyze(Box::new(stmt.clone()))
127+
}
128+
112129
fn visit_create_python_function(
113130
&self,
114131
stmt: &CreatePythonFunction,

src/coordinator/dataset/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212

1313
mod data_set;
1414
mod execute_result;
15+
mod show_catalog_tables_result;
16+
mod show_create_table_result;
1517
mod show_functions_result;
1618

1719
pub use data_set::{DataSet, empty_record_batch};
1820
pub use execute_result::ExecuteResult;
21+
pub use show_catalog_tables_result::ShowCatalogTablesResult;
22+
pub use show_create_table_result::ShowCreateTableResult;
1923
pub use show_functions_result::ShowFunctionsResult;
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
use std::sync::Arc;
14+
15+
use arrow_array::{Int32Array, StringArray};
16+
use arrow_schema::{DataType, Field, Schema};
17+
18+
use super::DataSet;
19+
use crate::sql::schema::{schema_columns_one_line, stream_table_row_detail, StreamTable};
20+
21+
#[derive(Clone, Debug)]
22+
pub struct ShowCatalogTablesResult {
23+
names: Vec<String>,
24+
kinds: Vec<String>,
25+
column_counts: Vec<i32>,
26+
schema_lines: Vec<String>,
27+
details: Vec<String>,
28+
}
29+
30+
impl ShowCatalogTablesResult {
31+
pub fn from_tables(tables: &[Arc<StreamTable>]) -> Self {
32+
let mut names = Vec::with_capacity(tables.len());
33+
let mut kinds = Vec::with_capacity(tables.len());
34+
let mut column_counts = Vec::with_capacity(tables.len());
35+
let mut schema_lines = Vec::with_capacity(tables.len());
36+
let mut details = Vec::with_capacity(tables.len());
37+
38+
for t in tables {
39+
let schema = t.schema();
40+
let ncols = schema.fields().len() as i32;
41+
names.push(t.name().to_string());
42+
kinds.push(match t.as_ref() {
43+
StreamTable::Source { .. } => "SOURCE",
44+
StreamTable::Sink { .. } => "SINK",
45+
}
46+
.to_string());
47+
column_counts.push(ncols);
48+
schema_lines.push(schema_columns_one_line(&schema));
49+
details.push(stream_table_row_detail(t.as_ref()));
50+
}
51+
52+
Self {
53+
names,
54+
kinds,
55+
column_counts,
56+
schema_lines,
57+
details,
58+
}
59+
}
60+
}
61+
62+
impl DataSet for ShowCatalogTablesResult {
63+
fn to_record_batch(&self) -> arrow_array::RecordBatch {
64+
let schema = Arc::new(Schema::new(vec![
65+
Field::new("table_name", DataType::Utf8, false),
66+
Field::new("kind", DataType::Utf8, false),
67+
Field::new("column_count", DataType::Int32, false),
68+
Field::new("schema_columns", DataType::Utf8, false),
69+
Field::new("details", DataType::Utf8, false),
70+
]));
71+
72+
arrow_array::RecordBatch::try_new(
73+
schema,
74+
vec![
75+
Arc::new(StringArray::from(
76+
self.names.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
77+
)),
78+
Arc::new(StringArray::from(
79+
self.kinds.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
80+
)),
81+
Arc::new(Int32Array::from(self.column_counts.clone())),
82+
Arc::new(StringArray::from(
83+
self.schema_lines.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
84+
)),
85+
Arc::new(StringArray::from(
86+
self.details.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
87+
)),
88+
],
89+
)
90+
.unwrap_or_else(|_| arrow_array::RecordBatch::new_empty(Arc::new(Schema::empty())))
91+
}
92+
}

0 commit comments

Comments
 (0)