Skip to content

Comments

Replace DataCarrier with BatchQueue for metrics pipeline#13703

Open
wu-sheng wants to merge 20 commits intomasterfrom
library-batch-queue
Open

Replace DataCarrier with BatchQueue for metrics pipeline#13703
wu-sheng wants to merge 20 commits intomasterfrom
library-batch-queue

Conversation

@wu-sheng
Copy link
Member

@wu-sheng wu-sheng commented Feb 15, 2026

New feature: library-batch-queue module

  • Update the documentation to include this new feature.
  • Tests(including UT, IT, E2E) are added to verify the new feature.

A partitioned, self-draining queue with type-based dispatch. Designed to replace DataCarrier in high-fan-out scenarios.

Key features:

  • One queue per concern, many types per queue. OAL + MAL metric types share unified queues instead of separate pools. Individual metric types register handlers; the queue groups drained items by class and dispatches to matching handlers. Queues are shared via BatchQueueManager.getOrCreate(name, config) — the first caller creates the queue, subsequent callers with the same name get the existing instance and register their type handlers.
  • Adaptive partitioning. Partition count grows automatically with addHandler() calls (~330 partitions for typical OAL+MAL workload on 8 threads), giving each metric type its own lock-free ArrayBlockingQueue.
  • Idle backoff. Drain loops double their sleep interval when idle (minIdleMs * 2^count, capped at maxIdleMs), resetting on first non-empty drain. No busy-waiting.
  • Hardware-aware thread sizing. ThreadPolicy resolves thread count at runtime: cpuCores(1.0), cpuCoresWithBase(1, 0.25), or fixed(N).
  • Throughput-weighted drain rebalancing. DrainBalancer.throughputWeighted() periodically reassigns partitions to equalize per-thread load when metric types have skewed throughput. Enabled for L1 and L2 with 10-second interval.

Adaptive partition mode (PartitionPolicy.adaptive())

Used by L1 aggregation, L2 persistence, and TopN queues. The partition count grows as metric type handlers are registered via addHandler(), rather than being fixed at construction time.

The algorithm uses a threshold = threadCount * multiplier (default multiplier = 25). With 8 drain threads the threshold is 200:

  • Below threshold (handlers ≤ 200): one partition per handler (1:1 mapping). Each metric type gets its own dedicated ArrayBlockingQueue, eliminating contention between types.
  • Above threshold (handlers > 200): excess handlers share partitions at 1:2 ratio: threshold + (handlerCount - threshold) / 2. This avoids unbounded partition growth while keeping most types isolated.

Examples with 8 threads (threshold = 200):

Registered handlers Partitions Ratio
0 (initial) 8 = threadCount
100 100 1:1
200 200 1:1, at threshold
500 350 200 + 300/2
1000 600 200 + 800/2

In practice, a typical OAL + MAL deployment has ~460 metric types, producing ~330 partitions on 8 threads (threshold 200, so 200 + (460-200)/2 = 330).

Thread policies

ThreadPolicy.cpuCores(multiplier) — Threads = round(multiplier × availableProcessors()), minimum 1. Scales with hardware: same config runs 8 threads on 8-core, 16 on 16-core.

Used by L1 aggregation with cpuCores(1.0) — one thread per core. L1 is CPU-bound (merging metrics in memory) and benefits from maximum parallelism.

ThreadPolicy.cpuCoresWithBase(base, multiplier) — Threads = base + round(multiplier × availableProcessors()), minimum 1. Provides a guaranteed minimum (base) plus hardware-proportional scaling.

Used by L2 persistence with cpuCoresWithBase(1, 0.25) — 1 + ¼ of cores. On 8-core = 3 threads, on 16-core = 5 threads, on 24-core = 7 threads. L2 is I/O-bound (writing to storage); fewer threads are needed but at least 1 is always guaranteed even on minimal hardware.

ThreadPolicy.fixed(N) — Exactly N threads regardless of hardware.

Used by TopN (1 thread), exporters (1 thread each), gRPC remote client (1 thread per peer), JDBC batch (configurable, default 4).

Improve the performance of metrics aggregation and persistence pipeline

  • Add a benchmark for the improvement.
  • The benchmark result.
  • Links/URLs to the theory proof or discussion articles/blogs: LPT (Longest Processing Time) heuristic — see Wikipedia: Multiprocessor scheduling.

Thread count reduction from 36 to 15 on an 8-core machine (gRPC remote client excluded — unchanged 1 thread per peer).

Queue Old threads New threads New policy
L1 Aggregation (OAL+MAL unified) 26 8 cpuCores(1.0)
L2 Persistence (OAL+MAL unified) 3 3 cpuCoresWithBase(1, 0.25)
TopN Persistence 4 1 fixed(1)
Exporters (gRPC/Kafka) 3 3 fixed(1) each
Total 36 15

Throughput: BatchQueue vs DataCarrier (ideal — uniform load across types)

Benchmark config: 32 producers, fixed(8) drain threads, IF_POSSIBLE strategy.
Metric types simulated with 2000 distinct classes, each carrying a boxed Long payload.

Partition strategy 500 types 1000 types 2000 types
DataCarrier baseline 33.4M 37.6M 38.0M
BatchQueue adaptive 45.7M (+37%) 50.5M (+34%) 64.0M (+68%)
BatchQueue 1:1 51.3M (+53%) 61.2M (+63%) 75.7M (+99%)

Throughput: static vs rebalanced (OAL/MAL skewed load)

Real OAL/MAL workloads have highly skewed entity counts — endpoint-scoped metrics (~24 entities) dominate
over service-scoped (~4 entities) and MAL (~1 entity). Static round-robin partition assignment creates
thread imbalance.

DrainBalancer.throughputWeighted() periodically snapshots per-partition throughput counters,
sorts partitions by load descending, and assigns each to the least-loaded thread (LPT heuristic).
A two-phase handoff (revoke → cycle-count fence → assign) prevents concurrent handler invocations
during reassignment. Enabled for L1 aggregation and L2 persistence queues with a 10-second rebalance interval.

Benchmark config: 4 drain threads, 16 producers, 100 types with entity-count-driven skew, 500 LCG iterations per item.

                    Static          Rebalanced
  Throughput:    7,211,794         8,729,310  items/sec
  Load ratio:       1.30x             1.04x  (max/min thread)
  Improvement:                       +21.0%

Stability over 20 seconds (sampled every 2s after initial rebalance):

  Interval    Throughput      Ratio
   0- 2s     8,915,955       1.00x
   2- 4s     8,956,595       1.01x
   4- 6s     8,934,778       1.00x
   6- 8s     8,838,461       1.01x
   8-10s     8,887,092       1.00x
  10-12s     8,844,614       1.00x
  12-14s     8,877,651       1.00x
  14-16s     8,851,595       1.01x
  16-18s     8,639,045       1.01x
  18-20s     8,708,210       1.01x
  Stable: YES (avg ratio 1.01x)

Other changes

  • Add named ThreadFactory to all anonymous Executors pool threads for easier thread dump analysis.

  • Remove library-datacarrier-queue module. All usages replaced by library-batch-queue.

  • Update the CHANGES log.

@wu-sheng wu-sheng added this to the 10.4.0 milestone Feb 15, 2026
@wu-sheng wu-sheng added core feature Core and important feature. Sometimes, break backwards compatibility. enhancement Enhancement on performance or codes complexity:high Relate to multiple(>4) components of SkyWalking labels Feb 15, 2026
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new library-batch-queue module and migrates OAP server internal pipelines from DataCarrier to BatchQueue, aiming to reduce thread counts, unify OAL/MAL queues, and improve observability via named executor threads. It also updates OTel rules and Grafana/So11y dashboards to match the new queue metrics dimensions and removes the legacy library-datacarrier-queue module.

Changes:

  • Add library-batch-queue (partitioned, self-draining queue) and replace DataCarrier usages across aggregation/persistence/TopN/exporters/remote client.
  • Name anonymous executor threads across the OAP server for easier thread-dump analysis.
  • Update telemetry rules and dashboards for the new queue metrics tags/dimensions; remove library-datacarrier-queue and related tests.

Reviewed changes

Copilot reviewed 98 out of 98 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCBatchDAO.java Switch JDBC async batching from DataCarrier to BatchQueue.
oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/pom.xml Replace datacarrier dependency with library-batch-queue.
oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml Remove unused library-datacarrier-queue dependency.
oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json Update So11y dashboard expressions/titles for unified queues and new labels.
oap-server/server-starter/src/main/resources/otel-rules/oap.yaml Update OTel aggregation rule dimensions (remove kind, add slot).
oap-server/server-library/pom.xml Remove library-datacarrier-queue, add library-batch-queue module.
oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/benchmark/StringFormatGroupBenchmark.java Add benchmark-style test for StringFormatGroup.
oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/MultipleFilesChangeMonitor.java Name the scheduled executor thread for file monitoring.
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/partition/SimpleRollingPartitionerTest.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/partition/ProducerThreadPartitionerTest.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/SampleConsumer.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerTest.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerPoolFactoryTest.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumeDriverTest.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/SampleData.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/EnvUtilTest.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierTest.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/partition/SimpleRollingPartitioner.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/partition/ProducerThreadPartitioner.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/partition/IDataPartitioner.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/IDriver.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerThread.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerPoolFactory.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerPool.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumeDriver.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/common/AtomicRangeInteger.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/QueueBuffer.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/Channels.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/ArrayBlockingQueueBuffer.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/EnvUtil.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrier.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java Name the health-check scheduled executor thread.
oap-server/server-library/library-batch-queue/src/test/resources/log4j2-test.xml Add log4j2 test config for the new module tests.
oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicyTest.java Add unit tests for ThreadPolicy.
oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java Add unit tests for PartitionPolicy.
oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BenchmarkMetricTypes.java Add benchmark helper for dispatch testing.
oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueManagerTest.java Add tests for queue registry/shared scheduler semantics.
oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfigTest.java Add tests for config validation/defaults.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicy.java Introduce thread-count policy abstraction.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/QueueErrorHandler.java Add error handler functional interface in batchqueue package.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionSelector.java Add partition-selection strategy API.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java Add partition-count policy abstraction (fixed/threadMultiply/adaptive).
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/HandlerConsumer.java Define handler consumer contract and idle hook.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BufferStrategy.java Define buffer backpressure/drop strategies.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueStats.java Add queue usage snapshot/top-N partition reporting.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueManager.java Add global queue registry + shared scheduler ref-counting.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfig.java Add queue config builder + validation.
oap-server/server-library/library-batch-queue/pom.xml Rename module artifact + add test dependencies.
oap-server/server-library/library-batch-queue/DESIGN.md Document drain rebalancing design.
oap-server/server-library/library-batch-queue/CLAUDE.md Add module-specific assistant documentation.
oap-server/server-library/library-banyandb-client/src/main/java/org/apache/skywalking/library/banyandb/v1/client/grpc/channel/ChannelManager.java Name the BanyanDB channel manager scheduler thread.
oap-server/server-health-checker/src/main/java/org/apache/skywalking/oap/server/health/checker/provider/HealthCheckerProvider.java Name the health checker scheduler thread.
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java Update tests for new GRPCRemoteClient ctor signature.
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java Update tests for new GRPCRemoteClient ctor signature.
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/benchmark/RegexVSQuickMatchBenchmark.java Add benchmark-style tests for endpoint grouping performance.
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/benchmark/EndpointGrouping4OpenapiBenchmark.java Add benchmark-style tests for OpenAPI grouping reader/formatter.
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/benchmark/EBPFProfilingAnalyzerBenchmark.java Add benchmark-style tests for EBPF profiling analyzer.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java Name the watermark watcher scheduler thread.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java Name the TTL keeper scheduler thread.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java Name persistence timer threads (scheduler + prepare pool).
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java Name remote client manager scheduler thread; update GRPCRemoteClient ctor usage.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java Replace DataCarrier with per-peer BatchQueue.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java Name EBPF fetch thread pool threads.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/hierarchy/HierarchyService.java Name hierarchy auto-matching scheduler thread.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java Simplify exporter service interface (remove start()).
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java Name endpoint URI recognition scheduler thread.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java Name cache update scheduler thread.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java Switch TopN persistence to shared BatchQueue + per-type handler.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java Pass TopN class into worker to register handler.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java Unify OAL/MAL aggregation & persistence workers onto shared queues.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java Replace L2 persistence DataCarrier with unified BatchQueue + queue metrics.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinOALWorker.java Remove OAL-specific L2 worker (unified queue).
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinMALWorker.java Remove MAL-specific L2 worker (unified queue).
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java Replace L1 DataCarrier pools with unified BatchQueue + queue metrics.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateOALWorker.java Remove OAL-specific L1 worker (unified queue).
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateMALWorker.java Remove MAL-specific L1 worker (unified queue).
oap-server/server-core/pom.xml Replace datacarrier dependency with library-batch-queue.
oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java Name the alarm scheduler thread.
oap-server/microbench/src/main/java/org/apache/skywalking/oap/server/microbench/library/datacarrier/common/AtomicRangeIntegerV2.java Remove microbench datacarrier artifacts (module removal).
oap-server/microbench/src/main/java/org/apache/skywalking/oap/server/microbench/library/datacarrier/common/AtomicRangeIntegerV1.java Remove microbench datacarrier artifacts (module removal).
oap-server/microbench/src/main/java/org/apache/skywalking/oap/server/microbench/library/datacarrier/common/AtomicRangeIntegerBenchmark.java Remove microbench datacarrier artifacts (module removal).
oap-server/microbench/pom.xml Drop datacarrier dependency from microbench module.
oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java Adjust tests for new exporter queue lifecycle and method names.
oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java Update config assertions for new buffer setting field.
oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java Replace exporter DataCarrier with BatchQueue.
oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java Replace exporter DataCarrier with BatchQueue.
oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java Replace exporter DataCarrier with BatchQueue.
oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java Simplify exporter buffer config to a single bufferSize.
oap-server/exporter/pom.xml Replace datacarrier dependency with library-batch-queue.
docs/en/setup/backend/grafana-instance.json Update Grafana instance dashboard panels/labels for unified queues and slot.
docs/en/setup/backend/grafana-cluster.json Update Grafana cluster dashboard panels/labels for unified queues and slot.
docs/en/changes/changes.md Document new module, migration details, and thread naming inventory.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@wu-sheng wu-sheng changed the title Replace DataCarrier with BatchQueue; name all thread pools Replace DataCarrier with BatchQueue for metrics pipeline Feb 15, 2026
wu-sheng and others added 5 commits February 15, 2026 22:55
Introduce BatchQueue as a new queue implementation for metrics aggregation
with type-based dispatch, configurable partition selection, and adaptive
backoff. Includes comprehensive unit tests, throughput benchmarks, and
DataCarrier comparison benchmarks. Also migrates existing microbench
benchmarks out of the JMH-based microbench module into standard JUnit tests.
Replace DataCarrier with BatchQueue for L1 metrics aggregation,
L2 metrics persistence, TopN persistence, all three exporters
(gRPC metrics, Kafka trace, Kafka log), and gRPC remote client.
Remove the library-datacarrier-queue module entirely.

All metric types (OAL + MAL) now share unified queues instead of
separate pools. Thread count reduced from 36 to 15 on 8-core.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
CLAUDE.md documents the module's design principles, architecture,
dispatch modes, scheduler modes, key classes, and current usage
across the codebase.

DESIGN.md describes the throughput-weighted partition rebalancing
feature: two-phase handoff protocol with cycle-count fencing for
safe concurrent-free reassignment, targeting L2 persistence (primary)
and L1 aggregation (secondary) queues.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace default pool-N-thread-M naming with descriptive thread names
across all Executors.newXxx() calls for easier thread dump analysis.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Enable periodic partition-to-thread reassignment based on drain
throughput to equalize load when metric types have skewed volume.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@wu-sheng wu-sheng force-pushed the library-batch-queue branch from 516da79 to 7d69bbf Compare February 15, 2026 14:59
wu-sheng and others added 6 commits February 21, 2026 17:19
…uler

OAP server crashed at startup with "BatchQueue [TOPN_PERSISTENCE] already
exists" because TopNWorker, MetricsAggregateWorker, and MetricsPersistentMinWorker
each called BatchQueueManager.create() with the same queue name for every
metric/TopN type, but create() throws on duplicate names.

Add BatchQueueManager.getOrCreate() for shared queues where multiple workers
register handlers on the same queue. The first caller creates the queue;
subsequent callers get the existing instance.

Remove the sharedScheduler infrastructure (shared ScheduledExecutorService
with reference counting) since all queues now use dedicated schedulers.
The getOrCreate() pattern with addHandler() provides the same sharing
semantics with better isolation (DrainBalancer, adaptive partitions).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
onIdle() was previously broadcast to all handlers from every drain
thread, causing concurrent onIdle() calls on the same handler. Now
only handlers whose partition is owned by the calling drain task
receive onIdle(), consistent with the consume() ownership model.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…lysis

Introduce a benchmarks/ directory with reusable environment setup and
benchmark case infrastructure. The framework separates environment
provisioning from benchmark execution, allowing the same case to run
against different topologies.

Environment (cluster_oap-banyandb):
- 2-node OAP cluster with BanyanDB on Kind (K8s 1.34)
- Istio ALS (Access Log Service) for telemetry collection
- Bookinfo sample app with Envoy sidecars at ~5 RPS
- Deployed via SkyWalking Helm chart with automatic image pre-loading
- Cluster health check via remote_out_count Prometheus metric
- Centralized version config in benchmarks/env

Case (thread-analysis):
- Collects periodic OAP thread dumps via kill -3 (SIGQUIT)
- Monitors metrics via swctl in the background
- Produces per-pod thread pool trend tables (OAP threads only)
- Highlights differences between pods in a comparison section
- Normalizes pool names (IPs, hashcodes, HttpClient instances)

Runner (run.sh):
- Two modes: setup-only and full run (setup + case)
- Automatic cleanup after run mode (Kind cluster + Docker prune)
- Error-only cleanup in setup mode

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@wu-sheng
Copy link
Member Author

Benchmark: BatchQueue Thread Pool Analysis

Ran the thread dump benchmark (benchmarks/run.sh run cluster_oap-banyandb thread-analysis) against commit 0518b55 on a 2-node OAP cluster with Istio ALS + Bookinfo (~5 RPS).

Environment

  • OAP: 2 replicas, BanyanDB standalone, Istio ALS (k8s-mesh)
  • JRE: OpenJDK 11.0.30 Temurin (aarch64, Ubuntu 24.04)
  • K8s: Kind 0.31.0, K8s 1.34
  • Traffic: Bookinfo sample app via Envoy sidecars, ~5 RPS

BatchQueue Thread Pools

Both OAP pods show identical, stable BatchQueue thread counts across all 5 dump rounds (60s apart):

BatchQueue Pool Threads States Notes
BatchQueue-METRICS_L1_AGGREGATION 10 WAITING(9) TIMED_WAITING(1) 9 consumer threads idle, 1 producer ticking
BatchQueue-METRICS_L2_PERSISTENCE 4 WAITING(2-3) TIMED_WAITING(1-2) Stable at 4 across all rounds
BatchQueue-TOPN_PERSISTENCE 1 TIMED_WAITING(1) Single thread, idle
BatchQueue-GRPC_REMOTE_* 1 TIMED_WAITING(1) One per remote OAP node

Thread count trend (identical on both pods):

Pool Name                              #1    #2    #3    #4    #5
BatchQueue-GRPC_REMOTE_*                1     1     1     1     1
BatchQueue-METRICS_L1_AGGREGATION      10    10    10    10    10
BatchQueue-METRICS_L2_PERSISTENCE       4     4     4     4     4
BatchQueue-TOPN_PERSISTENCE             1     1     1     1     1

Key Observations

  1. Thread counts are perfectly stable — no growth or leaks across 5 rounds (~5 min total)
  2. Both pods are symmetric — identical BatchQueue thread counts
  3. Consumer threads are properly idle — mostly in WAITING/TIMED_WAITING, no busy-spinning
  4. Total OAP thread count: Pod 1 settled at ~275-302 threads; Pod 2 at ~84-101 threads (difference is due to armeria-common-blocking-tasks pool only active on Pod 1 which handles more gRPC traffic)
  5. 16 BatchQueue threads per pod (10 + 4 + 1 + 1) for the metrics pipeline under ~5 RPS load

wu-sheng and others added 2 commits February 22, 2026 12:09
…ter_oap-banyandb

- Use a shared static HttpClient singleton instead of creating a new instance
  per alarm post() call, which leaked NIO selector threads.
- Rename benchmark environment from cluster_oap-banyandb to
  istio-cluster_oap-banyandb to reflect its Istio-based setup.
- Add changelog entries for both the fix and the benchmark framework.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Comment on lines -46 to +51
final var response = HttpClient
.newBuilder()
.followRedirects(HttpClient.Redirect.NORMAL)
.build()
final var response = HTTP_CLIENT
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a client leak.

wu-sheng and others added 4 commits February 22, 2026 19:35
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add SharedKubernetesClient enum singleton with KubernetesHttpClientFactory
  that uses virtual threads on JDK 25+ or a single fixed thread on JDK <25.
- Replace all KubernetesClientBuilder().build() calls (9 sites across 7 files)
  with the shared instance to eliminate per-call thread churn.
- Fix KubernetesCoordinator client leak (never closed, selector thread persisted).
- Consolidate kubernetes-client dependencies in query-graphql-plugin and
  configuration-k8s-configmap to use library-kubernetes-support.
- Fix benchmark health check to use curlimages/curl pod (OAP JRE 25 image
  does not include curl).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rison

Reduce Armeria HTTP server event loop threads from default (cores*2=20) to
max(5, cores/4) with a shared EventLoopGroup across all 4 HTTP servers.
HTTP traffic (UI, PromQL, LogQL) is much lighter than gRPC.

Add thread count benchmark comparison table (v10.3.0 vs v10.4.0) in changelog
showing ~50% reduction from 150+ to ~72 OAP threads on a 10-core machine.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Comment on lines +43 to +48
SharedKubernetesClient() {
client = new KubernetesClientBuilder()
.withHttpClientFactory(new KubernetesHttpClientFactory())
.build();
Runtime.getRuntime().addShutdownHook(
new Thread(client::close, "K8sClient-shutdown"));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another thread leak fix. HTTP Client(K8s Client) wasn't closed properly in the k8s coordinator; meanwhile, heavily creating and slowly closing (by GC only) in all K8s client use cases.

@wu-sheng
Copy link
Member Author

wu-sheng commented Feb 22, 2026

Thread Count Benchmark — v10.3.0 vs v10.4.0 (JDK 25)

Environment: 2-node OAP cluster, BanyanDB storage, JDK 25 (eclipse-temurin:25-jre), Istio bookinfo traffic, 10-core Kind node.
Method: 5 thread dump rounds × 60s apart via kill -3, JVM-internal threads excluded.

Results

Pool v10.3.0 threads v10.4.0 threads Notes
L1 Aggregation (OAL + MAL) 26 (DataCarrier) 10 (BatchQueue) Unified OAL + MAL
L2 Persistence (OAL + MAL) 3 (DataCarrier) 4 (BatchQueue) Unified OAL + MAL
TopN Persistence 4 (DataCarrier) 1 (BatchQueue)
gRPC Remote Client 1 (DataCarrier) 1 (BatchQueue) Per peer
Armeria HTTP event loop 20 5 max(5, cores/4) shared group
Armeria HTTP handler on-demand platform(increasing with payload) - Virtual threads on JDK 25+
gRPC event loop 10 10 Unchanged
gRPC handler on-demand platform(increasing with payload) - Virtual threads on JDK 25+
ForkJoinPool (Virtual Thread carrier) 0 ~10 JDK 25+ virtual thread scheduler
HttpClient-SelectorManager 4 2 SharedKubernetesClient
Schedulers + others ~24 ~24 Mostly unchanged
Total (OAP threads) 150+ ~72 ~50% reduction, stable under load

Key Changes

  1. DataCarrier → BatchQueue: L1+L2+TopN+Remote reduced from 34 → 16 threads with unified OAL+MAL queues
  2. Armeria event loop: 20 → 5 threads via max(5, cores/4) shared EventLoopGroup across all 4 HTTP servers
  3. Virtual threads (JDK 25+): gRPC and Armeria HTTP handlers now use virtual threads — platform handler threads eliminated, all share ~10 ForkJoinPool carrier threads
  4. SharedKubernetesClient: HttpClient-SelectorManager 4 → 2 threads by replacing 9 separate KubernetesClientBuilder().build() calls with a singleton

Thread counts are stable across all 5 dump rounds and consistent between both OAP pods (only transient pools like grpc-default-executor and ForkJoinPool.commonPool fluctuate by 1-2).

Report: benchmarks/reports/istio-cluster_oap-banyandb/thread-analysis/20260222-214827/

@wu-sheng wu-sheng requested a review from kezhenxu94 February 22, 2026 14:26
wu-sheng and others added 2 commits February 22, 2026 22:28
Use VirtualThreads.createScheduledExecutor() for CacheUpdateTimer,
DataTTLKeeper, HealthChecker, WatermarkWatcher, HierarchyAutoMatching,
and FetchingConfigWatcherRegister. On JDK 25+ these run as virtual
threads instead of dedicated platform threads. On JDK <25 they fall
back to named daemon platform threads.

Benchmark confirmed: 5 named scheduler threads disappear from thread
dump on JRE 25, total OAP threads drop from ~72 to ~67-69.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@wu-sheng
Copy link
Member Author

JRE 11 vs JRE 25 Thread Benchmark

Environment: 2-node OAP cluster, BanyanDB, Istio bookinfo traffic, 10-core Kind node.
Method: 5 thread dump rounds × 60s apart via kill -3, JVM-internal threads excluded.

JRE 25 (eclipse-temurin:25-jre) — ~67-69 OAP threads

Pool Count Notes
BatchQueue (L1+L2+TopN+Remote) 16 Unchanged
armeria-eventloop-epoll 5 max(5, cores/4) shared
armeria-boss-http (4 ports) 4
grpc-default-worker-ELG 10
grpc-default-executor 3
ForkJoinPool-1-worker (VT carrier) 10 Carries all virtual threads
ForkJoinPool-1-delayScheduler 1 VT scheduled task delays
VirtualThread-unblocker 1 VT scheduler support
HttpClient-SelectorManager 2-3 SharedKubernetesClient
Schedulers (Alarm, Persistence, etc.) ~14
CacheUpdateTimer 0 → virtual thread
DataTTLKeeper 0 → virtual thread
HealthChecker 0 → virtual thread
WatermarkWatcher 0 → virtual thread
HierarchyAutoMatching 0 → virtual thread
armeria-common-blocking-tasks 0 → virtual thread

JRE 11 (eclipse-temurin:11-jre) — 66-242 OAP threads

Pool Count Notes
BatchQueue (L1+L2+TopN+Remote) 16 Same as JRE 25
armeria-eventloop-epoll 5 Same shared group
armeria-boss-http (4 ports) 4
grpc-default-worker-ELG 10
grpc-default-executor 3-4
ForkJoinPool.commonPool-worker 4-7 JDK common pool (not VT)
K8sClient-executor 1 Platform fallback for SharedKubernetesClient
HttpClient-SelectorManager 2-3
Schedulers (Alarm, Persistence, etc.) ~14
CacheUpdateTimer 1 Platform thread
DataTTLKeeper 1 Platform thread
HealthChecker 1 Platform thread
WatermarkWatcher 1 Platform thread
HierarchyAutoMatching 1 Platform thread
armeria-common-blocking-tasks 0-200 Cached platform pool, max 200

Key Differences

Aspect JRE 11 JRE 25
Scheduler threads (5 pools) 5 platform threads 0 (virtual threads)
Armeria HTTP handler Up to 200 platform threads 0 (virtual threads)
VT carrier (ForkJoinPool) N/A ~10 shared carrier threads
Worst-case total 242 (one pod) 69
Steady-state total ~66 (excluding blocking) ~67-69

The armeria-common-blocking-tasks pool is the biggest difference — on JRE 11 it ballooned to 200 platform threads on one pod under load, while on JRE 25 all HTTP handler work runs on virtual threads sharing ~10 carrier threads.

Reports: benchmarks/reports/istio-cluster_oap-banyandb/thread-analysis/20260222-232130/ (JRE 25), 20260222-233901/ (JRE 11)

Code change: fix shared event loop formula from max(5, cores/4) to
min(5, cores). The previous formula scaled poorly — cores/4 gives 1
on 2-core and 6 on 24-core, defeating the intent to cap at 5.
min(5, cores) gives 2→2, 4→4, 8+→5, matching the design goal.

Documentation: add comprehensive class-level javadoc to both
GRPCServer and HTTPServer explaining their thread models, why we keep
framework default executor pools on JDK <25 (extensions/handlers may
block on long I/O), and how virtual threads replace them on JDK 25+.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@wu-sheng wu-sheng force-pushed the library-batch-queue branch from c1b3b95 to 5c7faa7 Compare February 23, 2026 15:53
@wu-sheng
Copy link
Member Author

Armeria HTTP Event Loop Fix & gRPC/HTTP Thread Model Documentation

Commit: 5c7faa71 — Fix Armeria HTTP event loop sizing and document gRPC/HTTP thread models

Code change

Fixed shared HTTP event loop formula from max(5, cores/4) to min(5, cores).

The previous formula was incorrect:

  • cores/4 gives 0 on 2-core (bumped to 5 by max), 2 on 8-core, 6 on 24-core
  • Intent was to cap at 5 since non-blocking I/O multiplexing needs few threads

min(5, cores) gives the correct behavior:

cores old max(5, cores/4) new min(5, cores)
2 5 2
4 5 4
8 5 5
10 5 5
24 6 5

Thread model documentation

Added comprehensive class-level javadoc to both GRPCServer and HTTPServer documenting:

GRPCServer — three-tier thread model:

  1. Boss event loop (1 thread, accepts connections)
  2. Worker event loop (cores threads, shared via SharedResourcePool, I/O multiplexing)
  3. Application executor (where onMessage/onHalfClose callbacks run)

HTTPServer — two-tier thread model:

  1. Event loop (min(5, cores) shared across all 7 servers)
  2. Blocking task executor (where @Blocking handlers run)

Handler executor policy (both servers)

gRPC HTTP (Armeria)
JDK 25+ Virtual threads Virtual threads
JDK <25 gRPC default CachedThreadPool (unbounded) Armeria default cached pool (up to 200)

Both keep framework defaults on JDK <25:

  • gRPC: CachedThreadPool (SynchronousQueue, Integer.MAX_VALUE max threads). SkyWalking extensions may register gRPC handlers with long-blocking I/O — a bounded pool would starve other services.
  • HTTP: Armeria's ScheduledThreadPoolExecutor (200 core threads, unbounded DelayedWorkQueue). HTTP handlers block on storage/DB queries (BanyanDB, Elasticsearch, 10ms–seconds) — a smaller pool would cause request queuing and UI timeouts.

On JDK 25+, virtual threads replace both pools.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

complexity:high Relate to multiple(>4) components of SkyWalking core feature Core and important feature. Sometimes, break backwards compatibility. enhancement Enhancement on performance or codes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant