Skip to content

refactor(loader): support concurrent readers, short-id & Graphsrc#683

Merged
imbajin merged 46 commits intoapache:masterfrom
sadwitdastreetz:update-loader
Oct 29, 2025
Merged

refactor(loader): support concurrent readers, short-id & Graphsrc#683
imbajin merged 46 commits intoapache:masterfrom
sadwitdastreetz:update-loader

Conversation

@sadwitdastreetz
Copy link
Copy Markdown
Contributor

@sadwitdastreetz sadwitdastreetz commented Sep 16, 2025

Purpose of the PR

This PR is a part of updating HugegraphLoader to 2.0 and most importantly, it is NOT ready yet. It introduces a major refactor and enhancement to the HugeGraph Loader, aiming to improve parallelism, stability, and compatibility during data loading.
It includes:

  • More flexible file/HDFS source support
  • Stream-based JDBC fetching (especially for Oracle/MySQL large tables)
  • Short-ID support with schema proxy injection
  • Graph-to-Graph schema synchronization
  • Improved concurrency model for input progress tracking

These changes address issues with performance bottlenecks, Kerberos token expiration, Oracle missing rows, and lack of schema compatibility when importing from another graph.

Main Changes

  1. Concurrency Testing: Verify thread safety under high load
  2. Graph Source Testing: Validate Graph-to-Graph migration scenarios
  3. Error Recovery: Test failure handling under various error conditions
  4. Performance Benchmarking: Compare throughput with previous implementation
  5. Resource Leak Testing: Ensure proper cleanup under error conditions

Loader

Refactored HugeGraphLoader with concurrent loading, Graph source support, and improved error handling.

Major Changes

Concurrency

  • Added multi-threaded loading with ExecutorService and CompletableFuture
  • Configurable parallelism via parallelCount option
  • Scatter mode for distributed loading

Graph Source Support

  • Added Graph-to-Graph data migration
  • Schema replication from source graphs
  • Vertex/edge label migration with property filtering
  • Index replication with field validation

Schema Management

  • Automatic graph creation with backend configuration
  • Short ID configuration support
  • Enhanced Groovy script execution

Error Handling

  • Improved ServerException handling
  • Better exception chain preservation
  • Modified exception propagation behavior

API Changes

  • load() always returns true instead of context.noError()
  • Added checkGraphExists() and setGraphMode() methods
  • Commented out reader.confirmOffset() in loadStruct()
  • Simplified main() error handling

Breaking Changes

  • Exception handling behavior modified
  • Resource cleanup sequence changed
  • Return value semantics changed in load()

Source Layer

  • FileSource: added dir_filter, extra_date_formats, headerCaseSensitive, and splitCount for flexible directory/file filtering and single-file parallel reading.
  • HDFSSource:
    • Kerberos auto-renewal via scheduled task.
    • Replaced prefix matching with FileFilter + DirFilter, supports recursive directory traversal.
    • Unified error handling with LoadException.
  • GraphSource (new): sync schema directly from another HugeGraph instance.

Reader Layer

  • FileReader: refactored init() and moved scan logic into split() → multiple sub-readers per file.
  • HDFSFileReader: fixed subdirectory traversal bug (using wrong variable), added DirFilter.
  • JDBCReader: replaced RowFetcher with streaming JDBCFetcher to avoid Oracle data loss and improve performance.
  • FineLineFetcher: added null checks to avoid NPE.

Progress Layer

  • InputProgress: refactored from single loadingItem to Map<String, InputItemProgress> for multi-file concurrent tracking.
    • Thread-safety improvements with synchronized maps.
    • New markLoaded(Readable, boolean) API for fine-grained progress confirmation.

Filter Layer

  • ShortId support:
    • Added ShortIdParser, ShortIdConfig.
    • Schema enhancement via SchemaManagerProxy and VertexLabelProxy using reflection, injecting short-id handling transparently into HugeClient.

Options

  • Extended LoadOptions with new cluster, graph, and loading optimization flags (--scatter-sources, --short-id, --restore, etc.).
  • Added dumpParams() to log all runtime parameters.

Others

  • Added GlobalExecutorManager for thread pool management.
  • Updated FileLoadTest to adapt to InputProgress refactor.
sequenceDiagram
  autonumber
  actor CLI as 用户(CLI)
  participant Loader as HugeGraphLoader
  participant Options as LoadOptions
  participant Ctx as LoadContext
  participant Exec as GlobalExecutorManager
  participant Reader as InputReader(s)
  participant Parse as ElementParseGroup
  participant ShortId as ShortIdParser
  participant Client as HugeClient

  CLI->>Loader: new(args) / load()
  Loader->>Options: 解析并设置并行/shortId 等
  Loader->>Ctx: init(Options) -> 创建 indirectClient, filterGroup
  Loader->>Exec: getExecutor(parallel)
  Loader->>Reader: create reader / split() (若 multiReaders)
  par 并行处理每个 InputTaskItem
    Reader-->>Loader: emit Line 或 GraphElement
    Loader->>Parse: filter(element)
    Parse-->>Loader: true / false
    alt 通过过滤
      Loader->>ShortId: 可能转换短ID
      ShortId-->>Loader: 更新 element.id
      Loader->>Client: 写入(提交/flush 由配置或每行触发)
    else 过滤掉
      Loader-->>Loader: 跳过
    end
    Loader->>Ctx: 标记进度 (loaded_items/loading_items map)
  end
  Loader->>Exec: shutdown()
  Loader-->>CLI: 返回完成/异常

Loading
sequenceDiagram
  autonumber
  participant GSrc as GraphSource
  participant GReader as GraphReader
  participant Fetch as GraphFetcher
  participant Client as HugeClient
  participant Loader as HugeGraphLoader

  GSrc->>GReader: new(GraphSource)
  GReader->>Client: createHugeClient()
  loop 批量拉取
    GReader->>Fetch: queryBatch(offset, size)
    Fetch->>Client: 执行查询
    Client-->>Fetch: 返回元素批次
    Fetch-->>GReader: elements
    GReader-->>Loader: 生成 Line(交由 NopBuilder/过滤链处理)
  end

Loading

Does this PR potentially affect the following parts?

  • Modify configurations (LoadOptions extended)
  • The public API (Loader command line options & progress API)
  • Dependencies
  • Other impacts

Documentation Status

  • Doc - TODO (need to update loader usage doc)

added method InputReader.multiReaders() and adapted for all SOURCE
多文件输入这部分还没确认完成,初步进展

相应配置 & 细节更改:
1. FileSource 新增了 dir_filter 和 extra_date_formats 参数,并修改了构造函数;并增加了 ORC/Parquet 文件表头不区分大小写的支持FileSource.headerCaseSensitive以及单文件应用的splitCount,提升了文件加载的灵活性和兼容性。
2. InputSource加入headerCaseSensitive()默认区分大小写
多文件输入功能
FileReader.java
  init() 只负责调用 progress(context, struct),不再扫描文件。
  文件扫描和 reader 分裂逻辑移到了 split() 方法:
    调用 scanReadables() 获取所有文件
    排序
    创建多个 FileReader 子实例,每个对应一个文件

InputProgress.java
新版特点
- 进度管理基于 文件名 -> InputItemProgress 的 Map
- 可以同时跟踪多个文件的加载状态(已加载 / 正在加载)
- 支持 多线程并发 和更精细的控制(比如只确认某个文件的 offset,或者只标记某个文件 loaded)

相关接口重构
  旧版
  - loadingItem():返回单个 loadingItem
  - addLoadingItem(InputItemProgress):替换当前 loadingItem,旧的丢到 loadingItems
  - loadingOffset():返回当前 loadingItem.offset()
  - markLoaded(boolean markAll):

  新版
  - loadingItem(String name):按文件名查找对应的 loadingItem
  - addLoadingItem(String name, InputItemProgress):按文件名新增
  - 取消了 loadingOffset(),因为已经支持多文件了,offset 必须按文件取
  - markLoaded(Readable readable, boolean markAll):
    - 如果传入 readable → 把对应文件从 loadingItems 移到 loadedItems
    - 否则(readable=null 且 markAll=true)→ 把全部 loadingItems 移过去
InputProgressDeser.java
旧版
  Set<InputItemProgress> loadedItems;
InputItemProgress loadingItem;
  用 Set 存储已完成的 items,用单对象存储正在加载的 item。
  新版
  Map<String, InputItemProgress> loadedItems;
Map<String, InputItemProgress> loadingItems;
  改成 Map(key 是字符串,比如文件名/ID),既能保持唯一性又能快速索引,还支持多个并发 "loading items"。
 并且使用了:
  Collections.synchronizedMap(InsertionOrderUtil.newMap());
  来保证线程安全 + 保留插入顺
@github-actions github-actions bot added the loader hugegraph-loader label Sep 16, 2025
@dosubot dosubot bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Sep 16, 2025
@codecov
Copy link
Copy Markdown

codecov bot commented Sep 30, 2025

Codecov Report

❌ Patch coverage is 31.47335% with 1093 lines in your changes missing coverage. Please review.
✅ Project coverage is 51.72%. Comparing base (b066b80) to head (b14ec5f).
⚠️ Report is 56 commits behind head on master.

Files with missing lines Patch % Lines
...a/org/apache/hugegraph/loader/HugeGraphLoader.java 30.24% 209 Missing and 17 partials ⚠️
...che/hugegraph/loader/reader/graph/GraphReader.java 0.00% 107 Missing ⚠️
...pache/hugegraph/loader/builder/ElementBuilder.java 43.67% 76 Missing and 13 partials ⚠️
.../apache/hugegraph/loader/filter/ShortIdParser.java 0.00% 72 Missing ⚠️
...ph/loader/filter/util/VertexLabelBuilderProxy.java 0.00% 61 Missing ⚠️
...org/apache/hugegraph/loader/util/DataTypeUtil.java 38.55% 47 Missing and 4 partials ⚠️
...e/hugegraph/loader/reader/hdfs/HDFSFileReader.java 51.80% 35 Missing and 5 partials ⚠️
...he/hugegraph/loader/filter/ElementLimitFilter.java 0.00% 39 Missing ⚠️
...he/hugegraph/loader/filter/util/ShortIdConfig.java 0.00% 39 Missing ⚠️
...he/hugegraph/loader/reader/graph/GraphFetcher.java 0.00% 36 Missing ⚠️
... and 34 more
Additional details and impacted files
@@              Coverage Diff              @@
##             master     #683       +/-   ##
=============================================
- Coverage     62.49%   51.72%   -10.78%     
- Complexity     1903     2059      +156     
=============================================
  Files           262      335       +73     
  Lines          9541    12520     +2979     
  Branches        886     1159      +273     
=============================================
+ Hits           5963     6476      +513     
- Misses         3190     5573     +2383     
- Partials        388      471       +83     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@imbajin imbajin moved this to 👀 In review in HugeGraph Tasks Oct 24, 2025
.map(item -> item.reader)
.collect(Collectors.toSet());
for (InputReader r : readers) {
if (!usedReaders.contains(r)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Important: Reader 资源清理逻辑不完善

prepareTaskItems() 的 finally 块中:

} finally {
    Set<InputReader> usedReaders = tasks.stream()
                                        .map(item -> item.reader)
                                        .collect(Collectors.toSet());
    for (InputReader r : readers) {
        if (!usedReaders.contains(r)) {
            try {
                r.close();
            } catch (Exception ex) {
                LOG.warn("Failed to close reader: {}", ex.getMessage());
            }
        }
    }
}

问题:

  1. 只关闭了 "未使用" 的 reader,但使用中的 reader 在哪里关闭?
  2. 如果 reader.init() 失败,reader 可能处于半初始化状态,仍需清理
  3. 异常被吞掉(只记录warn),可能隐藏重要的资源释放失败

建议:

  • 明确 reader 的生命周期管理责任
  • 使用 try-with-resources 或确保在任务完成后统一清理
  • 考虑是否需要一个 Reader 注册表来跟踪所有创建的 reader

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

使用中的reader在asyncLoadStruct中清理

.collect(Collectors.toList());

if (!CollectionUtils.isEmpty(selectedVertexLabels)) {
vertexLabels =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Important: Graph-to-Graph schema 同步缺少完整性验证

createGraphSourceLabels() 中:

Set<String> existedPKs =
        targetClient.schema().getPropertyKeys().stream()
                    .map(pk -> pk.name()).collect(Collectors.toSet());

for (String pkName : label.properties()) {
    PropertyKey pk = sourceClient.schema().getPropertyKey(pkName);
    if (!existedPKs.contains(pk.name())) {
        targetClient.schema().addPropertyKey(pk);
    }
}

问题:

  1. 只检查了 PropertyKey 的名称,没有验证数据类型是否一致
  2. 如果目标图中已存在同名但类型不同的 PropertyKey,会导致数据不兼容
  3. 没有处理 PropertyKey 的其他属性(如 Cardinality)

建议:

  • 比对 PropertyKey 的完整定义,包括 dataType、cardinality 等
  • 如果存在不兼容的 schema,给出明确的错误提示
  • 考虑添加强制覆盖选项

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not be necessary

@imbajin imbajin requested a review from Copilot October 27, 2025 08:17
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 69 out of 70 changed files in this pull request and generated 6 comments.

Comments suppressed due to low confidence (2)

hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/EdgeBuilder.java:1

  • Corrected duplicate word 'the' in error message.
/*

hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java:1

  • Corrected spelling of 'avaliable' to 'available' in error message (located in HugeClientHolder.java).
/*

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@imbajin imbajin requested a review from Copilot October 28, 2025 04:53
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 69 out of 70 changed files in this pull request and generated 6 comments.

Comments suppressed due to low confidence (2)

hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/ElementBuilder.java:1

  • Direct use of internal implementation class BuilderImpl instead of the interface. This creates tight coupling to internal APIs that may change.
/*

hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/EdgeBuilder.java:1

  • Remove duplicate 'the' in error message.
/*

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@imbajin
Copy link
Copy Markdown
Member

imbajin commented Oct 28, 2025

代码审查意见

感谢提交这个重要的PR!这是一个重大的重构,引入了并发加载、Graph-to-Graph迁移等重要功能。以下是我的审查意见:


‼️ Critical Issues (高优先级)

1. main方法异常处理不当 (HugeGraphLoader.java:115)

} catch (Throwable e) {
    Printer.printError("Failed to start loading", e);
    return;  // ❌ 应该使用 System.exit(1)
}

问题: 加载失败时仅return,未设置退出码,导致调用方(shell脚本、CI/CD)无法检测到失败。

建议:

} catch (Throwable e) {
    Printer.printError("Failed to start loading", e);
    System.exit(1);
}

2. 重复异常处理 (HugeGraphLoader.java:224)

clearAllDataIfNeeded()方法中,捕获异常后记录日志,然后又抛出异常,导致异常被处理两次。

建议: 要么只记录不抛出,要么只抛出不记录,或者重新包装为更具体的异常类型。


3. setGraphMode()逻辑冗余 (HugeGraphLoader.java:173-189)

options.restore为true且所有inputs都是Graph类型时,会调用两次setRestoreMode()

建议:

private void setGraphMode() {
    List<InputSource> inputs = this.mapping.structs().stream()
        .filter(struct -> !struct.skip())
        .map(InputStruct::input)
        .collect(Collectors.toList());

    boolean hasGraphSource = inputs.stream()
        .anyMatch(input -> SourceType.GRAPH.equals(input.type()));
    
    if (hasGraphSource && !inputs.stream()
            .allMatch(input -> SourceType.GRAPH.equals(input.type()))) {
        throw new LoadException("All inputs must be of Graph Type");
    }

    if (hasGraphSource || this.options.restore) {
        this.context().setRestoreMode();
    } else {
        this.context().setLoadingMode();
    }
}

4. 类型安全问题 (createGraphSourceLabels方法)

使用泛型List<? extends SchemaLabel>后强制类型转换,运行时可能抛出ClassCastException。

建议: 拆分为两个类型安全的方法:

private void createGraphSourceVertexLabels(
        HugeClient sourceClient,
        HugeClient targetClient,
        List<VertexLabel> labels,
        Map<String, GraphSource.SeletedLabelDes> selectedMap,
        Map<String, GraphSource.IgnoredLabelDes> ignoredMap) {
    // ...处理VertexLabel的逻辑
}

private void createGraphSourceEdgeLabels(
        HugeClient sourceClient,
        HugeClient targetClient,
        List<EdgeLabel> labels,
        Map<String, GraphSource.SeletedLabelDes> selectedMap,
        Map<String, GraphSource.IgnoredLabelDes> ignoredMap) {
    // ...处理EdgeLabel的逻辑
}

⚠️ Important Issues (中优先级)

5. load()方法返回值语义变更 (HugeGraphLoader.java:229)

return this.context.noError();改为return true;是一个破坏性变更,会影响所有依赖此返回值的调用方。

建议:

  • 如果不再需要返回错误状态,将返回类型改为void
  • 或者恢复原逻辑:return this.context.noError();
  • 或在JavaDoc中明确说明此行为变更

6. Stream重复创建 (setGraphMode方法)

inputsSupplier.get()被调用多次,每次创建新的Stream,影响性能且可能导致逻辑错误。

建议: 见上面第3点的优化代码。


7. 资源泄漏风险 (prepareTaskItems方法:598-652)

prepareTaskItems抛出异常时,已创建的readers可能不会被关闭。

建议:

private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs, boolean scatter) {
    ArrayList<InputTaskItem> tasks = new ArrayList<>();
    List<InputReader> allReaders = new ArrayList<>();
    
    try {
        for (InputStruct struct : structs) {
            // ...创建reader的代码...
            allReaders.addAll(readerList);
            // ...
        }
        return tasks;
    } finally {
        Set<InputReader> usedReaders = tasks.stream()
            .map(item -> item.reader)
            .collect(Collectors.toSet());
        for (InputReader r : allReaders) {
            if (!usedReaders.contains(r)) {
                try {
                    r.close();
                } catch (Exception ex) {
                    LOG.warn("Failed to close reader", ex);
                }
            }
        }
    }
}

8. 线程池未在异常情况下关闭 (loadStructs方法)

如果prepareTaskItems()或任务执行抛出异常,loadService不会被shutdown,导致线程泄漏。

建议:

ExecutorService loadService = null;
try {
    loadService = ExecutorUtil.newFixedThreadPool(parallelCount, "loader");
    // ... 执行任务 ...
} finally {
    if (loadService != null) {
        loadService.shutdown();
        try {
            if (!loadService.awaitTermination(60, TimeUnit.SECONDS)) {
                loadService.shutdownNow();
            }
        } catch (InterruptedException e) {
            loadService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

🧹 Minor Issues (低优先级)

9. 封装性 (InputTaskItem类)

所有字段都是public final,违反封装原则。建议使用private final + getter或使用record(Java 14+)。


Positive Aspects

  1. ✨ 引入并发加载机制,显著提升性能
  2. ✨ 添加Graph-to-Graph迁移支持,功能实用
  3. ✨ 短ID支持通过反射注入,设计巧妙
  4. ✨ 改进的错误处理和日志记录

📋 测试建议

请确保涵盖以下测试场景:

  1. 并发测试: 高负载下的线程安全性
  2. Graph源测试: Graph-to-Graph迁移场景验证
  3. 异常恢复: 各种错误条件下的失败处理
  4. 性能基准: 与之前实现的吞吐量对比
  5. 资源泄漏测试: 异常情况下的资源清理

总体来说这是一个很有价值的重构,但需要解决上述关键问题以确保代码的健壮性和可维护性。

Copy link
Copy Markdown
Contributor

@Thespica Thespica left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Thank you @sadwitdastreetz !

@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Oct 29, 2025
@imbajin imbajin merged commit 8a936a2 into apache:master Oct 29, 2025
17 of 18 checks passed
@github-project-automation github-project-automation bot moved this from 👀 In review to ✅ Done in HugeGraph Tasks Oct 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

lgtm This PR has been approved by a maintainer loader hugegraph-loader size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

4 participants