Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
_stop_background_threads_latch.count_down();
if (_sender_thread) {
_sender_thread->join();
_send_batch_thread_pool_token->shutdown();
}

Expr::close(_output_expr_ctxs, state);
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1573,7 +1573,11 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
}

for (auto& rs_reader : rs_readers) {
rs_reader->init(&reader_context);
res = rs_reader->init(&reader_context);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name();
break;
}
}

} while (0);
Expand Down Expand Up @@ -1717,7 +1721,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl

RowsetReaderSharedPtr rowset_reader;
RETURN_NOT_OK((*base_rowset)->create_reader(_mem_tracker, &rowset_reader));
rowset_reader->init(&reader_context);
RETURN_NOT_OK(rowset_reader->init(&reader_context));

RowsetWriterContext writer_context;
writer_context.rowset_id = StorageEngine::instance()->next_rowset_id();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1345,8 +1345,8 @@ int64_t Tablet::prepare_compaction_and_calculate_permits(CompactionType compacti
OLAPStatus res = _base_compaction->prepare_compact();
if (res != OLAP_SUCCESS) {
set_last_base_compaction_failure_time(UnixMillis());
DorisMetrics::instance()->base_compaction_request_failed->increment(1);
if (res != OLAP_ERR_BE_NO_SUITABLE_VERSION) {
DorisMetrics::instance()->base_compaction_request_failed->increment(1);
LOG(WARNING) << "failed to pick rowsets for base compaction. res=" << res
<< ", tablet=" << full_name();
}
Expand Down
Binary file removed fe/fe-core/AlterRoutineLoadOperationLogTest
Binary file not shown.
Binary file removed fe/fe-core/diskInfoTest
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ private void analyzeSubPredicate(Expr subExpr) throws AnalysisException {
break CHECK;
}

if (!isAccurateMatch && !value.contains("%")) {
if (hasLabel && !isAccurateMatch && !value.contains("%")) {
value = "%" + value + "%";
}
if (hasLabel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public ShowResultSetMetaData getMetaData() {
builder.addColumn(new Column("PartitionId", ScalarType.createVarchar(30)));
builder.addColumn(new Column("IndexId", ScalarType.createVarchar(30)));
builder.addColumn(new Column("IsSync", ScalarType.createVarchar(30)));
builder.addColumn(new Column("Order", ScalarType.createVarchar(30)));
builder.addColumn(new Column("DetailCmd", ScalarType.createVarchar(30)));
} else {
for (String title : TabletsProcDir.TITLE_NAMES) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,4 +563,10 @@ public void setWatermarkTxnId(long watermarkTxnId) {
public long getWatermarkTxnId() {
return watermarkTxnId;
}

public boolean isAlive() {
return getState() != ReplicaState.CLONE
&& getState() != ReplicaState.DECOMMISSION
&& !isBad();
}
}
10 changes: 4 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
Expand Down Expand Up @@ -419,9 +419,7 @@ public Pair<TabletStatus, TabletSchedCtx.Priority> getHealthStatusWithPriority(
Set<String> hosts = Sets.newHashSet();
for (Replica replica : replicas) {
Backend backend = systemInfoService.getBackend(replica.getBackendId());
if (backend == null || !backend.isAlive() || replica.getState() == ReplicaState.CLONE
|| replica.getState() == ReplicaState.DECOMMISSION
|| replica.isBad() || !hosts.add(backend.getHost())) {
if (backend == null || !backend.isAlive() || !replica.isAlive() || !hosts.add(backend.getHost())) {
// this replica is not alive,
// or if this replica is on same host with another replica, we also treat it as 'dead',
// so that Tablet Scheduler will create a new replica on different host.
Expand Down
58 changes: 30 additions & 28 deletions fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -97,7 +99,9 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
* from the tablet scheduler.
*/
private static final int RUNNING_FAILED_COUNTER_THRESHOLD = 3;


private static VersionCountComparator VERSION_COUNTER_COMPARATOR = new VersionCountComparator();

public enum Type {
BALANCE, REPAIR
}
Expand Down Expand Up @@ -504,50 +508,33 @@ public void chooseSrcReplica(Map<Long, PathSlot> backendsWorkingSlots) throws Sc
if (replica.getLastFailedVersion() > 0) {
continue;
}

if (!replica.checkVersionCatchUp(visibleVersion, visibleVersionHash, false)) {
continue;
}

candidates.add(replica);
}

if (candidates.isEmpty()) {
throw new SchedException(Status.UNRECOVERABLE, "unable to find source replica");
}

// choose a replica which slot is available from candidates.
long minVersionCount = Long.MAX_VALUE;
boolean findSrcReplica = false;
// sort replica by version count asc, so that we prefer to choose replicas with fewer versions
Collections.sort(candidates, VERSION_COUNTER_COMPARATOR);
for (Replica srcReplica : candidates) {
PathSlot slot = backendsWorkingSlots.get(srcReplica.getBackendId());
if (slot == null) {
continue;
}

long srcPathHash = slot.takeSlot(srcReplica.getPathHash());
if (srcPathHash != -1) {
if (!findSrcReplica) {
// version count is set by report process, so it may not be set yet and default value is -1.
// so we need to check it.
minVersionCount = srcReplica.getVersionCount() == -1 ? Long.MAX_VALUE : srcReplica.getVersionCount();
setSrc(srcReplica);
findSrcReplica = true;
} else {
long curVerCount = srcReplica.getVersionCount() == -1 ? Long.MAX_VALUE : srcReplica.getVersionCount();
if (curVerCount < minVersionCount) {
minVersionCount = curVerCount;
setSrc(srcReplica);
findSrcReplica = true;
}
}
setSrc(srcReplica);
return;
}
}

if (findSrcReplica) {
return;
}

throw new SchedException(Status.SCHEDULE_FAILED, "unable to find source slot");
}

Expand Down Expand Up @@ -623,7 +610,6 @@ public void chooseDestReplicaForVersionIncomplete(Map<Long, PathSlot> backendsWo
if (destPathHash == -1) {
throw new SchedException(Status.SCHEDULE_FAILED, "unable to take slot of dest path");
}

if (chosenReplica.getState() == ReplicaState.DECOMMISSION) {
// Since this replica is selected as the repair object of VERSION_INCOMPLETE,
// it means that this replica needs to be able to accept loading data.
Expand Down Expand Up @@ -666,7 +652,7 @@ public void releaseResource(TabletScheduler tabletScheduler, boolean reserveTabl
}
}
}

if (destPathHash != -1) {
PathSlot slot = tabletScheduler.getBackendsWorkingSlots().get(destBackendId);
if (slot != null) {
Expand Down Expand Up @@ -1102,4 +1088,20 @@ public String toString() {
}
return sb.toString();
}

// Comparator to sort the replica with version count, asc
public static class VersionCountComparator implements Comparator<Replica> {
@Override
public int compare(Replica r1, Replica r2) {
long verCount1 = r1.getVersionCount() == -1 ? Long.MAX_VALUE : r1.getVersionCount();
long verCount2 = r2.getVersionCount() == -1 ? Long.MAX_VALUE : r2.getVersionCount();
if (verCount1 < verCount2) {
return -1;
} else if (verCount1 > verCount2) {
return 1;
} else {
return 0;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ private Tag chooseProperTag(TabletSchedCtx tabletCtx, boolean forMissingReplica)
Map<Tag, Short> currentAllocMap = Maps.newHashMap();
for (Replica replica : replicas) {
Backend be = infoService.getBackend(replica.getBackendId());
if (be != null) {
if (be != null && be.isAlive() && replica.isAlive()) {
Short num = currentAllocMap.getOrDefault(be.getTag(), (short) 0);
currentAllocMap.put(be.getTag(), (short) (num + 1));
}
Expand Down Expand Up @@ -1196,11 +1196,11 @@ private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx,

PathSlot slot = backendsWorkingSlots.get(rootPathLoadStatistic.getBeId());
if (slot == null) {
LOG.debug("backend {} does not found when getting slots", rootPathLoadStatistic.getBeId());
continue;
}

if (slot.takeSlot(rootPathLoadStatistic.getPathHash()) != -1) {
long pathHash = slot.takeSlot(rootPathLoadStatistic.getPathHash());
if (pathHash != -1) {
return rootPathLoadStatistic;
}
}
Expand All @@ -1209,11 +1209,11 @@ private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx,
for (RootPathLoadStatistic rootPathLoadStatistic : allFitPaths) {
PathSlot slot = backendsWorkingSlots.get(rootPathLoadStatistic.getBeId());
if (slot == null) {
LOG.debug("backend {} does not found when getting slots", rootPathLoadStatistic.getBeId());
continue;
}

if (slot.takeSlot(rootPathLoadStatistic.getPathHash()) != -1) {
long pathHash = slot.takeSlot(rootPathLoadStatistic.getPathHash());
if (pathHash != -1) {
return rootPathLoadStatistic;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,15 @@
// SHOW PROC "/cluster_balance/cluster_load_stat"
public class ClusterLoadStatByTag implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add(
"StorageMedium").build();

private Map<String, Tag> tagMap = Maps.newHashMap();
"Tag").build();

@Override
public ProcResult fetchResult() throws AnalysisException {
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
List<Long> beIds = Catalog.getCurrentSystemInfo().getBackendIds(false);
Set<Tag> tags = Sets.newHashSet();
for (long beId : beIds) {
Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
if (be != null) {
tags.add(be.getTag());
}
}
Set<Tag> tags = genTagMap();
for (Tag tag : tags) {
result.addRow(Lists.newArrayList(tag.toKey()));
tagMap.put(tag.toKey(), tag);
}
return result;
}
Expand All @@ -64,11 +54,27 @@ public boolean register(String name, ProcNodeInterface node) {

@Override
public ProcNodeInterface lookup(String name) throws AnalysisException {
Set<Tag> tags = genTagMap();
Map<String, Tag> tagMap = Maps.newHashMap();
for (Tag tag : tags) {
tagMap.put(tag.toKey(), tag);
}
Tag tag = tagMap.get(name);
if (tag == null) {
throw new AnalysisException("No such tag: " + name);
}
return new ClusterLoadStatByTagAndMedium(tag);
}

private Set<Tag> genTagMap() {
Set<Tag> tags = Sets.newHashSet();
List<Long> beIds = Catalog.getCurrentSystemInfo().getBackendIds(false);
for (long beId : beIds) {
Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
if (be != null) {
tags.add(be.getTag());
}
}
return tags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.common.proc;

import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
Expand Down Expand Up @@ -133,32 +134,44 @@ static class DBStatistic {
.stream().map(AgentTask::getTabletId).collect(Collectors.toSet());

SystemInfoService infoService = Catalog.getCurrentSystemInfo();
ColocateTableIndex colocateTableIndex = Catalog.getCurrentColocateIndex();
List<Long> aliveBeIdsInCluster = infoService.getClusterBackendIds(db.getClusterName(), true);
db.getTables().stream().filter(t -> t != null && t.getType() == TableType.OLAP).forEach(t -> {
++tableNum;
OlapTable olapTable = (OlapTable) t;
ColocateTableIndex.GroupId groupId = colocateTableIndex.isColocateTable(olapTable.getId()) ?
colocateTableIndex.getGroup(olapTable.getId()) : null;
olapTable.readLock();
try {
for (Partition partition : olapTable.getAllPartitions()) {
ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId());
++partitionNum;
for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
++indexNum;
for (Tablet tablet : materializedIndex.getTablets()) {
List<Tablet> tablets = materializedIndex.getTablets();
for (int i = 0; i < tablets.size(); ++i) {
Tablet tablet = tablets.get(i);
++tabletNum;
replicaNum += tablet.getReplicas().size();

Pair<TabletStatus, Priority> res = tablet.getHealthStatusWithPriority(
infoService, db.getClusterName(),
partition.getVisibleVersion(), partition.getVisibleVersionHash(),
replicaAlloc, aliveBeIdsInCluster);
TabletStatus res = null;
if (groupId != null) {
Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, i);
res = tablet.getColocateHealthStatus(partition.getVisibleVersion(), replicaAlloc, backendsSet);
} else {
Pair<TabletStatus, Priority> pair = tablet.getHealthStatusWithPriority(
infoService, db.getClusterName(),
partition.getVisibleVersion(), partition.getVisibleVersionHash(),
replicaAlloc, aliveBeIdsInCluster);
res = pair.first;
}

// here we treat REDUNDANT as HEALTHY, for user friendly.
if (res.first != TabletStatus.HEALTHY && res.first != TabletStatus.REDUNDANT
&& res.first != TabletStatus.COLOCATE_REDUNDANT && res.first != TabletStatus.NEED_FURTHER_REPAIR
&& res.first != TabletStatus.UNRECOVERABLE) {
if (res != TabletStatus.HEALTHY && res != TabletStatus.REDUNDANT
&& res != TabletStatus.COLOCATE_REDUNDANT && res != TabletStatus.NEED_FURTHER_REPAIR
&& res != TabletStatus.UNRECOVERABLE) {
unhealthyTabletIds.add(tablet.getId());
} else if (res.first == TabletStatus.UNRECOVERABLE) {
} else if (res == TabletStatus.UNRECOVERABLE) {
unrecoverableTabletIds.add(tablet.getId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ private void runPendingJobs() {
task = new HadoopLoadPendingTask(job);
break;
default:
LOG.warn("unknown etl job type. type: {}", etlJobType.name());
LOG.warn("unknown etl job type. type: {}, job id: {}, label: {}, db: {}",
etlJobType.name(), job.getId(), job.getLabel(), job.getDbId());
break;
}
if (task != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ PlanFragment createInsertFragment(
*/
private PlanFragment createPlanFragments(
PlanNode root, boolean isPartitioned,
long perNodeMemLimit, ArrayList<PlanFragment> fragments) throws UserException, AnalysisException {
long perNodeMemLimit, ArrayList<PlanFragment> fragments) throws UserException {
ArrayList<PlanFragment> childFragments = Lists.newArrayList();
for (PlanNode child : root.getChildren()) {
// allow child fragments to be partitioned, unless they contain a limit clause
Expand Down Expand Up @@ -272,7 +272,7 @@ private PlanFragment createMergeFragment(PlanFragment inputFragment)
* fragment
* TODO: hbase scans are range-partitioned on the row key
*/
private PlanFragment createScanFragment(PlanNode node) {
private PlanFragment createScanFragment(PlanNode node) throws UserException {
if (node instanceof MysqlScanNode || node instanceof OdbcScanNode) {
return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.UNPARTITIONED);
} else if (node instanceof SchemaScanNode) {
Expand Down
Loading