Skip to content

Commit 0321c50

Browse files
committed
update
1 parent 58a9e5c commit 0321c50

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1889
-1837
lines changed

src/runtime/streaming/operators/grouping/incremental_aggregate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use crate::runtime::streaming::arrow::decode_aggregate;
4949
use crate::runtime::streaming::operators::{Key, UpdatingCache};
5050
use crate::runtime::streaming::StreamOutput;
5151
use crate::sql::common::{to_nanos, CheckpointBarrier, FsSchema, Watermark, TIMESTAMP_FIELD, UPDATING_META_FIELD};
52-
use crate::sql::logical_planner::updating_meta_fields;
52+
use crate::sql::physical::updating_meta_fields;
5353

5454
#[derive(Debug, Copy, Clone)]
5555
struct BatchData {

src/runtime/streaming/operators/joins/join_instance.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ use crate::runtime::streaming::api::operator::{MessageOperator, Registry};
3434
use async_trait::async_trait;
3535
use protocol::grpc::api::JoinOperator;
3636
use crate::runtime::streaming::StreamOutput;
37+
use crate::sql::common::constants::mem_exec_join_side;
3738
use crate::sql::common::{from_nanos, CheckpointBarrier, FsSchema, FsSchemaRef, Watermark};
38-
use crate::sql::logical_planner::{DecodingContext, FsPhysicalExtensionCodec};
39+
use crate::sql::physical::{DecodingContext, FsPhysicalExtensionCodec};
3940

4041
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
4142
enum JoinSide {
@@ -47,8 +48,8 @@ impl JoinSide {
4748
#[allow(dead_code)]
4849
fn name(&self) -> &'static str {
4950
match self {
50-
JoinSide::Left => "left",
51-
JoinSide::Right => "right",
51+
JoinSide::Left => mem_exec_join_side::LEFT,
52+
JoinSide::Right => mem_exec_join_side::RIGHT,
5253
}
5354
}
5455
}

src/runtime/streaming/operators/joins/join_with_expiration.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use async_trait::async_trait;
3333
use protocol::grpc::api::JoinOperator;
3434
use crate::runtime::streaming::StreamOutput;
3535
use crate::sql::common::{CheckpointBarrier, FsSchema, Watermark};
36-
use crate::sql::logical_planner::{DecodingContext, FsPhysicalExtensionCodec};
36+
use crate::sql::physical::{DecodingContext, FsPhysicalExtensionCodec};
3737

3838
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
3939
enum JoinSide {

src/runtime/streaming/operators/stateless_physical_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use futures::StreamExt;
2727
use prost::Message;
2828

2929
use crate::runtime::streaming::api::operator::Registry;
30-
use crate::sql::logical_planner::{DecodingContext, FsPhysicalExtensionCodec};
30+
use crate::sql::physical::{DecodingContext, FsPhysicalExtensionCodec};
3131

3232
pub struct StatelessPhysicalExecutor {
3333
batch: Arc<RwLock<Option<RecordBatch>>>,

src/runtime/streaming/operators/windows/session_aggregating_window.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use protocol::grpc::api::SessionWindowAggregateOperator;
4444
use crate::runtime::streaming::StreamOutput;
4545
use crate::sql::common::{from_nanos, to_nanos, CheckpointBarrier, FsSchema, FsSchemaRef, Watermark};
4646
use crate::sql::common::converter::Converter;
47-
use crate::sql::logical_planner::{DecodingContext, FsPhysicalExtensionCodec};
47+
use crate::sql::physical::{DecodingContext, FsPhysicalExtensionCodec};
4848
use crate::sql::schema::utils::window_arrow_struct;
4949
// ============================================================================
5050
// 领域模型与纯内存状态

src/runtime/streaming/operators/windows/sliding_aggregating_window.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::runtime::streaming::api::operator::Registry;
4343
use protocol::grpc::api::SlidingWindowAggregateOperator;
4444
use crate::runtime::streaming::StreamOutput;
4545
use crate::sql::common::{from_nanos, to_nanos, CheckpointBarrier, FsSchema, Watermark};
46-
use crate::sql::logical_planner::{DecodingContext, FsPhysicalExtensionCodec};
46+
use crate::sql::physical::{DecodingContext, FsPhysicalExtensionCodec};
4747
// ============================================================================
4848
// 纯内存状态:阶梯式时间面板 (Tiered panes)
4949
// 这部分本身就是极佳的内存数据结构,原样保留!

src/runtime/streaming/operators/windows/tumbling_aggregating_window.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use protocol::grpc::api::TumblingWindowAggregateOperator;
4444
use crate::runtime::streaming::StreamOutput;
4545
use crate::sql::common::{from_nanos, to_nanos, CheckpointBarrier, FsSchema, Watermark};
4646
use crate::sql::common::time_utils::print_time;
47-
use crate::sql::logical_planner::{DecodingContext, FsPhysicalExtensionCodec};
47+
use crate::sql::physical::{DecodingContext, FsPhysicalExtensionCodec};
4848
use crate::sql::schema::utils::add_timestamp_field_arrow;
4949

5050
struct ActiveBin {

src/runtime/streaming/operators/windows/window_function.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use async_trait::async_trait;
3737
use crate::runtime::streaming::StreamOutput;
3838
use crate::sql::common::{from_nanos, CheckpointBarrier, FsSchema, FsSchemaRef, Watermark};
3939
use crate::sql::common::time_utils::print_time;
40-
use crate::sql::logical_planner::{DecodingContext, FsPhysicalExtensionCodec};
40+
use crate::sql::physical::{DecodingContext, FsPhysicalExtensionCodec};
4141

4242
// ============================================================================
4343
// 纯内存执行缓冲区

src/sql/analysis/async_udf_rewriter.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
// limitations under the License.
1212

1313
use crate::sql::extensions::remote_table::RemoteTableBoundaryNode;
14-
use crate::sql::extensions::{ASYNC_RESULT_FIELD, AsyncFunctionExecutionNode};
14+
use crate::sql::common::constants::sql_field;
15+
use crate::sql::extensions::AsyncFunctionExecutionNode;
1516
use crate::sql::schema::StreamSchemaProvider;
1617
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
1718
use datafusion::common::{Column, Result as DFResult, TableReference, plan_err};
@@ -55,7 +56,7 @@ impl<'a> AsyncUdfRewriter<'a> {
5556
);
5657
}
5758
return Ok(Transformed::yes(Expr::Column(Column::new_unqualified(
58-
ASYNC_RESULT_FIELD,
59+
sql_field::ASYNC_RESULT,
5960
))));
6061
}
6162
}

src/sql/analysis/join_rewriter.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::sql::extensions::join::StreamingJoinNode;
1515
use crate::sql::extensions::key_calculation::KeyExtractionNode;
1616
use crate::sql::analysis::streaming_window_analzer::StreamingWindowAnalzer;
1717
use crate::sql::types::{WindowType, fields_with_qualifiers, schema_from_df_fields_with_metadata};
18+
use crate::sql::common::constants::mem_exec_join_side;
1819
use crate::sql::common::TIMESTAMP_FIELD;
1920
use datafusion::common::tree_node::{Transformed, TreeNodeRewriter};
2021
use datafusion::common::{
@@ -198,8 +199,8 @@ impl TreeNodeRewriter for JoinRewriter<'_> {
198199

199200
// 2. Prepare Keyed Inputs for Shuffle
200201
let (left_on, right_on): (Vec<_>, Vec<_>) = join.on.clone().into_iter().unzip();
201-
let keyed_left = self.build_keyed_side(join.left, left_on, "left")?;
202-
let keyed_right = self.build_keyed_side(join.right, right_on, "right")?;
202+
let keyed_left = self.build_keyed_side(join.left, left_on, mem_exec_join_side::LEFT)?;
203+
let keyed_right = self.build_keyed_side(join.right, right_on, mem_exec_join_side::RIGHT)?;
203204

204205
// 3. Assemble Rewritten Join Node
205206
let join_schema = Arc::new(build_join_schema(

0 commit comments

Comments
 (0)