Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
Expand Down Expand Up @@ -71,7 +72,7 @@
public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOperator {

private boolean finished = false;
private TsBlock inputTsBlock;
protected TsBlock inputTsBlock;

protected List<TableAggregator> tableAggregators;
protected final List<ColumnSchema> groupingKeySchemas;
Expand Down Expand Up @@ -104,11 +105,11 @@
// e.g. for aggregation `last(s1), count(s2), count(s1)`, the inputChannels should be [0, 1, 0]
protected List<Integer> aggregatorInputChannels;

private QueryDataSource queryDataSource;
protected QueryDataSource queryDataSource;

protected ITableTimeRangeIterator timeIterator;

private boolean allAggregatorsHasFinalResult = false;
protected boolean allAggregatorsHasFinalResult = false;

protected AbstractAggTableScanOperator(AbstractAggTableScanOperatorParameter parameter) {

Expand Down Expand Up @@ -193,7 +194,7 @@
}

/** Return true if we have the result of this timeRange. */
protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange() {
protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange() throws Exception {

Check warning on line 197 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Expected @throws tag for 'Exception'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzkgUCdguHjumTnDTQA&open=AZzkgUCdguHjumTnDTQA&pullRequest=17294

Check failure on line 197 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 19 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzkgUCdguHjumTnDTP-&open=AZzkgUCdguHjumTnDTP-&pullRequest=17294
try {
if (calcFromCachedData()) {
updateResultTsBlock();
Expand Down Expand Up @@ -706,7 +707,7 @@
return true;
}

private void checkIfAllAggregatorHasFinalResult() {
protected void checkIfAllAggregatorHasFinalResult() throws Exception {
if (allAggregatorsHasFinalResult
&& (timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR
|| tableAggregators.isEmpty())) {
Expand All @@ -729,7 +730,7 @@
}
}

private void nextDevice() {
protected void nextDevice() throws Exception {

Check warning on line 733 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the declaration of thrown exception 'java.lang.Exception', as it cannot be thrown from method's body.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzkgUCdguHjumTnDTP_&open=AZzkgUCdguHjumTnDTP_&pullRequest=17294

Check warning on line 733 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace generic exceptions with specific library exceptions or a custom exception.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzkgUCdguHjumTnDTP9&open=AZzkgUCdguHjumTnDTP9&pullRequest=17294
currentDeviceIndex++;
this.operatorContext.recordSpecifiedInfo(
CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
Expand Down Expand Up @@ -812,6 +813,8 @@
protected List<DeviceEntry> deviceEntries;
protected int deviceCount;

private List<Symbol> outputSymbols;

public AbstractAggTableScanOperatorParameter(
PlanNodeId sourceId,
OperatorContext context,
Expand All @@ -830,7 +833,8 @@
boolean ascending,
boolean canUseStatistics,
List<Integer> aggregatorInputChannels,
String timeColumnName) {
String timeColumnName,
List<Symbol> outputSymbols) {
this.sourceId = sourceId;
this.context = context;
this.aggColumnSchemas = aggColumnSchemas;
Expand All @@ -849,6 +853,11 @@
this.canUseStatistics = canUseStatistics;
this.aggregatorInputChannels = aggregatorInputChannels;
this.timeColumnName = timeColumnName;
this.outputSymbols = outputSymbols;
}

public List<Symbol> getOutputSymbols() {
return outputSymbols;
}

public OperatorContext getOperatorContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private void constructCurrentDeviceOperatorTree() {
}
DeviceEntry deviceEntry = this.deviceEntries.get(this.currentDeviceIndex);

deviceChildOperatorTreeGenerator.generateCurrentDeviceOperatorTree(deviceEntry);
deviceChildOperatorTreeGenerator.generateCurrentDeviceOperatorTree(deviceEntry, true);
currentDeviceRootOperator = deviceChildOperatorTreeGenerator.getCurrentDeviceRootOperator();
dataSourceOperators = deviceChildOperatorTreeGenerator.getCurrentDeviceDataSourceOperators();
currentDeviceInit = false;
Expand Down Expand Up @@ -188,6 +188,10 @@ public long ramBytesUsed() {
+ RamUsageEstimator.sizeOfCollection(deviceEntries);
}

public DeviceChildOperatorTreeGenerator getDeviceChildOperatorTreeGenerator() {
return deviceChildOperatorTreeGenerator;
}

public static class TreeNonAlignedDeviceViewScanParameters {
public final OperatorContext context;
public final List<DeviceEntry> deviceEntries;
Expand Down Expand Up @@ -217,7 +221,7 @@ public interface DeviceChildOperatorTreeGenerator {
boolean keepOffsetAndLimitOperatorAfterDeviceIterator();

// Generate the following operator subtree based on the current deviceEntry
void generateCurrentDeviceOperatorTree(DeviceEntry deviceEntry);
void generateCurrentDeviceOperatorTree(DeviceEntry deviceEntry, boolean needAdaptor);

// Returns the root operator of the subtree
Operator getCurrentDeviceRootOperator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
}

/** Main process logic, calc the last aggregation results of current device. */
private void processCurrentDevice() {
private void processCurrentDevice() throws Exception {

Check warning on line 159 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Expected @throws tag for 'Exception'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzkgUC_guHjumTnDTQJ&open=AZzkgUC_guHjumTnDTQJ&pullRequest=17294
if (currentHitCacheIndex < hitCachesIndexes.size()
&& outputDeviceIndex == hitCachesIndexes.get(currentHitCacheIndex)) {
currentDeviceEntry = cachedDeviceEntries.get(currentHitCacheIndex);
Expand All @@ -175,7 +175,7 @@
}
}

private void buildResultUseLastRowCache() {

Check warning on line 178 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 136 to 64, Complexity from 18 to 14, Nesting Level from 4 to 2, Number of Variables from 14 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzkgUC_guHjumTnDTQI&open=AZzkgUC_guHjumTnDTQI&pullRequest=17294
appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
Pair<OptionalLong, TsPrimitiveType[]> currentHitResult =
lastRowCacheResults.get(currentHitCacheIndex);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.source.relational;

import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;

import com.google.common.util.concurrent.ListenableFuture;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.utils.RamUsageEstimator;

import java.util.List;
import java.util.Optional;

import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING;

public class TreeNonAlignedDeviceViewAggregationScanOperator

Check warning on line 41 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This class has 6 parents which is greater than 5 authorized.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzkgUCmguHjumTnDTQB&open=AZzkgUCmguHjumTnDTQB&pullRequest=17294
extends AbstractDefaultAggTableScanOperator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(
TreeNonAlignedDeviceViewAggregationScanOperator.class);

private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor;
private final DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator childOperatorGenerator;

private Operator child;
private List<Operator> dataSourceOperators;

public TreeNonAlignedDeviceViewAggregationScanOperator(
AbstractAggTableScanOperatorParameter parameter,
IDeviceID.TreeDeviceIdColumnValueExtractor extractor,
DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator childOperatorGenerator) {
super(parameter);
this.extractor = extractor;
this.childOperatorGenerator = childOperatorGenerator;
constructCurrentDeviceOperatorTree();
}

@Override
public ListenableFuture<?> isBlocked() {
return child.isBlocked();
}

@Override
String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) {
return (String) extractor.extract(deviceEntry.getDeviceID(), idColumnIndex);
}

@Override
protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange() {
try {
// First try to calculate from cached data
if (calcFromCachedData()) {
updateResultTsBlock();
checkIfAllAggregatorHasFinalResult();
return Optional.of(true);
}

// Read from child operator
if (readAndCalcFromChild()) {
updateResultTsBlock();
checkIfAllAggregatorHasFinalResult();
return Optional.of(true);
}

// No more data from child, finish the current device
if (!child.hasNext()) {
updateResultTsBlock();
timeIterator.resetCurTimeRange();
nextDevice();

if (currentDeviceIndex >= deviceCount) {
// All devices consumed
timeIterator.setFinished();
return Optional.of(true);
} else {
// More devices to process, child should provide next device's data
return Optional.of(false);
}
}

return Optional.of(false);
} catch (Exception e) {
throw new RuntimeException("Error while processing aggregation from child operator", e);

Check warning on line 108 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace generic exceptions with specific library exceptions or a custom exception.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzkgUCmguHjumTnDTQC&open=AZzkgUCmguHjumTnDTQC&pullRequest=17294
}
}

/** Read data from child operator and calculate aggregation. */
private boolean readAndCalcFromChild() throws Exception {

Check warning on line 113 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Expected @throws tag for 'Exception'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzkgUCmguHjumTnDTQF&open=AZzkgUCmguHjumTnDTQF&pullRequest=17294
long start = System.nanoTime();

while (System.nanoTime() - start < leftRuntimeOfOneNextCall && child.hasNext()) {
// Get next TsBlock from child
TsBlock tsBlock = child.nextWithTimer();
if (tsBlock == null || tsBlock.isEmpty()) {
continue;
}

// Calculate aggregation from raw data
if (calcUsingRawData(tsBlock)) {
return true;
}

// If not finished, continue reading from child
}

return false;
}

@Override
protected void nextDevice() throws Exception {
currentDeviceIndex++;
childOperatorGenerator.getCurrentDeviceStartCloseOperator().close();
if (currentDeviceIndex >= deviceEntries.size()) {
return;
}
constructCurrentDeviceOperatorTree();
queryDataSource.reset();
initQueryDataSource(queryDataSource);
this.operatorContext.recordSpecifiedInfo(
CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
}

private void constructCurrentDeviceOperatorTree() {
if (this.deviceEntries.isEmpty()) {
return;
}
if (this.deviceEntries.get(this.currentDeviceIndex) == null) {
throw new IllegalStateException(
"Device entries of index " + this.currentDeviceIndex + " is empty");
}
DeviceEntry deviceEntry = this.deviceEntries.get(this.currentDeviceIndex);

childOperatorGenerator.generateCurrentDeviceOperatorTree(deviceEntry, false);
child = childOperatorGenerator.getCurrentDeviceRootOperator();
dataSourceOperators = childOperatorGenerator.getCurrentDeviceDataSourceOperators();
}

/** same with {@link DeviceIteratorScanOperator#initQueryDataSource(IQueryDataSource)} */

Check warning on line 163 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Single-line Javadoc comment should be multi-line.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzkgUCmguHjumTnDTQH&open=AZzkgUCmguHjumTnDTQH&pullRequest=17294

Check warning on line 163 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

First sentence of Javadoc is missing an ending period.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZzkgUCmguHjumTnDTQG&open=AZzkgUCmguHjumTnDTQG&pullRequest=17294
@Override
public void initQueryDataSource(IQueryDataSource dataSource) {

this.queryDataSource = (QueryDataSource) dataSource;
this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
if (dataSourceOperators == null || dataSourceOperators.isEmpty()) {
return;
}
for (Operator operator : dataSourceOperators) {
((AbstractDataSourceOperator) operator).initQueryDataSource(dataSource);
}
}

@Override
protected void checkIfAllAggregatorHasFinalResult() throws Exception {
if (allAggregatorsHasFinalResult
&& (timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR
|| tableAggregators.isEmpty())) {
nextDevice();
inputTsBlock = null;

if (currentDeviceIndex >= deviceCount) {
// all devices have been consumed
timeIterator.setFinished();
}

allAggregatorsHasFinalResult = false;
}
}

@Override
public long calculateMaxPeekMemory() {
return child.calculateMaxPeekMemory();
}

@Override
public long calculateMaxReturnSize() {
return child.calculateMaxReturnSize();
}

@Override
public long calculateRetainedSizeAfterCallingNext() {
return child.calculateRetainedSizeAfterCallingNext();
}

@Override
public long ramBytesUsed() {
return INSTANCE_SIZE
+ MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil)
+ MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+ MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId)
+ (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes())
+ RamUsageEstimator.sizeOfCollection(deviceEntries)
+ MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(child);
}
}
Loading
Loading