Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ before_script:
- rm -rf site-repo

script:
- npm run build
- travis_wait 30 npm run build

after_success:
- if [ "$TRAVIS_EVENT_TYPE" != "push" ]; then exit 0; fi
Expand Down
16 changes: 12 additions & 4 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,10 @@ alter_stmt ::=
{:
RESULT = new AlterDatabaseRename(dbName, newDbName);
:}
| KW_ALTER KW_DATABASE ident:dbName KW_SET KW_PROPERTIES LPAREN key_value_map:map RPAREN
{:
RESULT = new AlterDatabasePropertyStmt(dbName, map);
:}
| KW_ALTER KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel opt_properties:jobProperties
opt_datasource_properties:datasourceProperties
{:
Expand Down Expand Up @@ -1084,13 +1088,13 @@ opt_enable_feature_properties ::=
;

alter_system_clause ::=
KW_ADD KW_BACKEND string_list:hostPorts
KW_ADD KW_BACKEND string_list:hostPorts opt_properties:properties
{:
RESULT = new AddBackendClause(hostPorts, false);
RESULT = new AddBackendClause(hostPorts, false, properties);
:}
| KW_ADD KW_FREE KW_BACKEND string_list:hostPorts
{:
RESULT = new AddBackendClause(hostPorts, true);
RESULT = new AddBackendClause(hostPorts, true, Maps.newHashMap());
:}
| KW_ADD KW_BACKEND KW_TO ident:clusterName string_list:hostPorts
{:
Expand Down Expand Up @@ -1141,7 +1145,11 @@ alter_system_clause ::=
| KW_SET KW_LOAD KW_ERRORS KW_HUB opt_properties:properties
{:
RESULT = new AlterLoadErrorUrlClause(properties);
:}
:}
| KW_MODIFY KW_BACKEND string_list:hostPorts KW_SET LPAREN key_value_map:properties RPAREN
{:
RESULT = new ModifyBackendClause(hostPorts, properties);
:}
;

// Sync Stmt
Expand Down
22 changes: 11 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Table.TableType;
import org.apache.doris.catalog.View;
Expand Down Expand Up @@ -566,11 +567,10 @@ public void modifyPartitionsProperty(Database db,

// get value from properties here
// 1. data property
DataProperty newDataProperty =
PropertyAnalyzer.analyzeDataProperty(properties, null);
// 2. replication num
short newReplicationNum =
PropertyAnalyzer.analyzeReplicationNum(properties, (short) -1);
DataProperty newDataProperty = PropertyAnalyzer.analyzeDataProperty(properties, null);
// 2. replica allocation
ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
Catalog.getCurrentSystemInfo().checkReplicaAllocation(db.getClusterName(), replicaAlloc);
// 3. in memory
boolean newInMemory = PropertyAnalyzer.analyzeBooleanProp(properties,
PropertyAnalyzer.PROPERTIES_INMEMORY, false);
Expand All @@ -586,9 +586,9 @@ public void modifyPartitionsProperty(Database db,
if (newDataProperty != null) {
partitionInfo.setDataProperty(partition.getId(), newDataProperty);
}
// 2. replication num
if (newReplicationNum != (short) -1) {
partitionInfo.setReplicationNum(partition.getId(), newReplicationNum);
// 2. replica allocation
if (!replicaAlloc.isNotSet()) {
partitionInfo.setReplicaAllocation(partition.getId(), replicaAlloc);
}
// 3. in memory
boolean oldInMemory = partitionInfo.getIsInMemory(partition.getId());
Expand All @@ -600,7 +600,7 @@ public void modifyPartitionsProperty(Database db,
partitionInfo.setTabletType(partition.getId(), tTabletType);
}
ModifyPartitionInfo info = new ModifyPartitionInfo(db.getId(), olapTable.getId(), partition.getId(),
newDataProperty, newReplicationNum, hasInMemory ? newInMemory : oldInMemory);
newDataProperty, replicaAlloc, hasInMemory ? newInMemory : oldInMemory);
modifyPartitionInfos.add(info);
}

Expand All @@ -618,8 +618,8 @@ public void replayModifyPartition(ModifyPartitionInfo info) throws MetaNotFoundE
if (info.getDataProperty() != null) {
partitionInfo.setDataProperty(info.getPartitionId(), info.getDataProperty());
}
if (info.getReplicationNum() != (short) -1) {
partitionInfo.setReplicationNum(info.getPartitionId(), info.getReplicationNum());
if (!info.getReplicaAlloc().isNotSet()) {
partitionInfo.setReplicaAllocation(info.getPartitionId(), info.getReplicaAlloc());
}
partitionInfo.setIsInMemory(info.getPartitionId(), info.isInMemory());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ private RollupJobV2 createMaterializedViewJob(String mvName, String baseIndexNam
MaterializedIndex mvIndex = new MaterializedIndex(mvIndexId, IndexState.SHADOW);
MaterializedIndex baseIndex = partition.getIndex(baseIndexId);
TabletMeta mvTabletMeta = new TabletMeta(dbId, tableId, partitionId, mvIndexId, mvSchemaHash, medium);
short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partitionId);
short replicationNum = olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum();
for (Tablet baseTablet : baseIndex.getTablets()) {
long baseTabletId = baseTablet.getId();
long mvTabletId = catalog.getNextId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ public int tryFinishJob() {
continue;
}

short expectReplicationNum = olapTable.getPartitionInfo().getReplicationNum(partition.getId());
short expectReplicationNum = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum();
MaterializedIndex rollupIndex = entry.getValue();
for (Tablet rollupTablet : rollupIndex.getTablets()) {
// yiguolei: the rollup tablet only contains the replica that is healthy at rollup time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,8 @@ protected void runRunningJob() throws AlterCancelException {

long visiableVersion = partition.getVisibleVersion();
long visiableVersionHash = partition.getVisibleVersionHash();
short expectReplicationNum = tbl.getPartitionInfo().getReplicationNum(partition.getId());
short expectReplicationNum = tbl.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum();


MaterializedIndex rollupIndex = entry.getValue();
for (Tablet rollupTablet : rollupIndex.getTablets()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
Expand Down Expand Up @@ -1333,7 +1334,8 @@ private void createJob(long dbId, OlapTable olapTable, Map<Long, LinkedList<Colu
MaterializedIndex shadowIndex = new MaterializedIndex(shadowIndexId, IndexState.SHADOW);
MaterializedIndex originIndex = partition.getIndex(originIndexId);
TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId, newSchemaHash, medium);
short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partitionId);
ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo().getReplicaAllocation(partitionId);
Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
for (Tablet originTablet : originIndex.getTablets()) {
long originTabletId = originTablet.getId();
long shadowTabletId = catalog.getNextId();
Expand Down Expand Up @@ -1366,7 +1368,7 @@ private void createJob(long dbId, OlapTable olapTable, Map<Long, LinkedList<Colu
healthyReplicaNum++;
}

if (healthyReplicaNum < replicationNum / 2 + 1) {
if (healthyReplicaNum < totalReplicaNum / 2 + 1) {
/*
* TODO(cmy): This is a bad design.
* Because in the schema change job, we will only send tasks to the shadow replicas that have been created,
Expand Down Expand Up @@ -1673,12 +1675,12 @@ public void process(List<AlterClause> alterClauses, String clusterName, Database
}
Catalog.getCurrentCatalog().modifyTableDynamicPartition(db, olapTable, properties);
return;
} else if (properties.containsKey("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)) {
Preconditions.checkNotNull(properties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM));
Catalog.getCurrentCatalog().modifyTableDefaultReplicationNum(db, olapTable, properties);
} else if (properties.containsKey("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
Preconditions.checkNotNull(properties.get("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION));
Catalog.getCurrentCatalog().modifyTableDefaultReplicaAllocation(db, olapTable, properties);
return;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)) {
Catalog.getCurrentCatalog().modifyTableReplicationNum(db, olapTable, properties);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
Catalog.getCurrentCatalog().modifyTableReplicaAllocation(db, olapTable, properties);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public boolean sendTasks() {
List<AgentTask> tasks = new LinkedList<AgentTask>();
for (Partition partition : olapTable.getPartitions()) {
long partitionId = partition.getId();
short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partitionId);
short replicationNum = olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum();
for (Long indexId : this.changedIndexIdToSchema.keySet()) {
MaterializedIndex alterIndex = partition.getIndex(indexId);
if (alterIndex == null) {
Expand Down Expand Up @@ -679,7 +679,7 @@ public int tryFinishJob() {
boolean hasUnfinishedPartition = false;
for (Partition partition : olapTable.getPartitions()) {
long partitionId = partition.getId();
short expectReplicationNum = olapTable.getPartitionInfo().getReplicationNum(partition.getId());
short expectReplicationNum = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum();
boolean hasUnfinishedIndex = false;
for (long indexId : this.changedIndexIdToSchema.keySet()) {
MaterializedIndex materializedIndex = partition.getIndex(indexId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ protected void runRunningJob() throws AlterCancelException {

long visiableVersion = partition.getVisibleVersion();
long visiableVersionHash = partition.getVisibleVersionHash();
short expectReplicationNum = tbl.getPartitionInfo().getReplicationNum(partition.getId());
short expectReplicationNum = tbl.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum();

Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId);
for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.analysis.DropBackendClause;
import org.apache.doris.analysis.DropFollowerClause;
import org.apache.doris.analysis.DropObserverClause;
import org.apache.doris.analysis.ModifyBackendClause;
import org.apache.doris.analysis.ModifyBrokerClause;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
Expand Down Expand Up @@ -143,8 +144,8 @@ public synchronized void process(List<AlterClause> alterClauses, String clusterN
&& Catalog.getCurrentCatalog().getCluster(destClusterName) == null) {
throw new DdlException("Cluster: " + destClusterName + " does not exist.");
}
Catalog.getCurrentSystemInfo().addBackends(addBackendClause.getHostPortPairs(),
addBackendClause.isFree(), addBackendClause.getDestCluster());
Catalog.getCurrentSystemInfo().addBackends(addBackendClause.getHostPortPairs(),
addBackendClause.isFree(), addBackendClause.getDestCluster(), addBackendClause.getTag());
} else if (alterClause instanceof DropBackendClause) {
// drop backend
DropBackendClause dropBackendClause = (DropBackendClause) alterClause;
Expand Down Expand Up @@ -188,6 +189,8 @@ public synchronized void process(List<AlterClause> alterClauses, String clusterN
} else if (alterClause instanceof AlterLoadErrorUrlClause) {
AlterLoadErrorUrlClause clause = (AlterLoadErrorUrlClause) alterClause;
Catalog.getCurrentCatalog().getLoadInstance().setLoadErrorHubInfo(clause.getProperties());
} else if (alterClause instanceof ModifyBackendClause) {
Catalog.getCurrentSystemInfo().modifyBackends(((ModifyBackendClause) alterClause));
} else {
Preconditions.checkState(false, alterClause.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,38 @@

package org.apache.doris.analysis;

import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.resource.Tag;

import com.google.common.base.Strings;
import com.google.common.collect.Maps;

import java.util.List;
import java.util.Map;

public class AddBackendClause extends BackendClause {

// be in free state is not owned by any cluster
protected boolean isFree;
// cluster that backend will be added to
protected String destCluster;
protected Map<String, String> properties = Maps.newHashMap();
private Tag tag;

public AddBackendClause(List<String> hostPorts) {
super(hostPorts);
this.isFree = true;
this.destCluster = "";
}
public AddBackendClause(List<String> hostPorts, boolean isFree) {

public AddBackendClause(List<String> hostPorts, boolean isFree, Map<String, String> properties) {
super(hostPorts);
this.isFree = isFree;
this.destCluster = "";
this.properties = properties;
if (this.properties == null) {
this.properties = Maps.newHashMap();
}
}

public AddBackendClause(List<String> hostPorts, String destCluster) {
Expand All @@ -46,6 +57,21 @@ public AddBackendClause(List<String> hostPorts, String destCluster) {
this.destCluster = destCluster;
}

public Tag getTag() {
return tag;
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
super.analyze(analyzer);
tag = PropertyAnalyzer.analyzeBackendTagProperties(properties);
}

@Override
public Map<String, String> getProperties() {
return properties;
}

@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
Expand Down Expand Up @@ -77,3 +103,4 @@ public String getDestCluster() {
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.analysis;

import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;

import java.util.Map;

public class AlterDatabasePropertyStmt extends DdlStmt {
private String dbName;
private Map<String, String> properties;

public AlterDatabasePropertyStmt(String dbName, Map<String, String> properties) {
this.dbName = dbName;
this.properties = properties;
}

public String getDbName() {
return dbName;
}

public Map<String, String> getProperties() {
return properties;
}

@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
// TODO: add some property check
}

@Override
public String toSql() {
return "ALTER DATABASE " + dbName + " SET PROPERTIES ("
+ new PrintableMap<String, String>(properties, "=", true, false, ",") + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public void analyze(Analyzer analyzer) throws UserException {
|| (alterClause instanceof AddFollowerClause)
|| (alterClause instanceof DropFollowerClause)
|| (alterClause instanceof ModifyBrokerClause)
|| (alterClause instanceof AlterLoadErrorUrlClause));
|| (alterClause instanceof AlterLoadErrorUrlClause)
|| (alterClause instanceof ModifyBackendClause));

alterClause.analyze(analyzer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.ExtractCommonFactorsRule;
import org.apache.doris.rewrite.FoldConstantsRule;
import org.apache.doris.rewrite.NormalizeBinaryPredicatesRule;
import org.apache.doris.rewrite.RewriteAliasFunctionRule;
import org.apache.doris.rewrite.RewriteEncryptKeyRule;
import org.apache.doris.rewrite.RewriteFromUnixTimeRule;
import org.apache.doris.rewrite.NormalizeBinaryPredicatesRule;
import org.apache.doris.rewrite.SimplifyInvalidDateBinaryPredicatesDateRule;
import org.apache.doris.rewrite.mvrewrite.CountDistinctToBitmap;
import org.apache.doris.rewrite.mvrewrite.CountDistinctToBitmapOrHLLRule;
Expand Down Expand Up @@ -463,6 +463,7 @@ public TupleDescriptor registerTableRef(TableRef ref) throws AnalysisException {
// aliasMap_.put(alias, result);
tupleByAlias.put(alias, result);
}

tableRefMap_.put(result.getId(), ref);

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

public class BackendClause extends AlterClause {
protected List<String> hostPorts;

protected List<Pair<String, Integer>> hostPortPairs;

protected BackendClause(List<String> hostPorts) {
Expand Down
Loading