Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1a15f53
udf: replace function
Aug 13, 2020
7d072b9
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 14, 2020
391158f
udf: replace function
Aug 14, 2020
5eb52e1
udf: replace function
Aug 14, 2020
a79ea91
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 18, 2020
86f841f
udf: replace function
Aug 18, 2020
ade1afa
udf: replace function
Aug 18, 2020
1d95a49
udf: replace function
Aug 18, 2020
433eb87
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 24, 2020
c62d239
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 25, 2020
0a8db8c
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 25, 2020
02d3f86
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 1, 2020
41da0ba
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 8, 2020
de4e523
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 11, 2020
a863b0d
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 17, 2020
024c422
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 22, 2020
e1656c7
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 23, 2020
837e037
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 27, 2020
a36d3e7
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 17, 2020
43cfa2b
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 23, 2020
641efb5
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 24, 2020
5066131
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 26, 2020
dde044b
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 27, 2020
8f8c823
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 28, 2020
f7704ba
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 1, 2020
8726e7a
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 4, 2020
4dfc785
Merge remote-tracking branch 'upstream/master' into str_replace
Nov 6, 2020
51f8196
support using udf when loading data
Nov 7, 2020
9a3ae2d
update
Nov 8, 2020
6db8e74
support udf when loading data
Nov 9, 2020
0f00849
support udf when loading data
Nov 9, 2020
080a914
support udf when loading data
Nov 9, 2020
07fa6e2
Merge remote-tracking branch 'upstream/master' into support_udf_load
Nov 9, 2020
eabea85
support udf when loading data
Nov 9, 2020
134b036
update
Nov 10, 2020
74c3713
support udf
Nov 10, 2020
e3d94f4
update
Nov 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public abstract class StatementBase implements ParseNode {

private OriginStatement origStmt;

private UserIdentity userInfo;

protected StatementBase() { }

/**
Expand Down Expand Up @@ -169,6 +171,14 @@ public OriginStatement getOrigStmt() {
return origStmt;
}

public UserIdentity getUserInfo() {
return userInfo;
}

public void setUserInfo(UserIdentity userInfo) {
this.userInfo = userInfo;
}

/**
* Resets the internal analysis state of this node.
* For easier maintenance, class members that need to be reset are grouped into
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ public final class FeMetaVersion {
public static final int VERSION_91 = 91;
// for mysql external table support resource
public static final int VERSION_92 = 92;
//jira: 4863 for load job support udf
public static final int VERSION_93 = 93;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_92;
public static final int VERSION_CURRENT = VERSION_93;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.load.loadv2;

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
Expand Down Expand Up @@ -70,9 +71,10 @@ public BrokerLoadJob() {
this.jobType = EtlJobType.BROKER;
}

public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, OriginStatement originStmt)
public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc,
OriginStatement originStmt, UserIdentity userInfo)
throws MetaNotFoundException {
super(dbId, label, originStmt);
super(dbId, label, originStmt, userInfo);
this.timeoutSecond = Config.broker_load_default_timeout_second;
this.brokerDesc = brokerDesc;
this.jobType = EtlJobType.BROKER;
Expand Down Expand Up @@ -194,7 +196,8 @@ brokerFileGroups, getDeadlineMs(), execMemLimit,
strictMode, transactionId, this, timezone, timeoutSecond);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
task.init(loadId, attachment.getFileStatusByTable(aggKey), attachment.getFileNumByTable(aggKey));
task.init(loadId, attachment.getFileStatusByTable(aggKey),
attachment.getFileNumByTable(aggKey), getUserInfo());
idToTasks.put(task.getSignature(), task);
// idToTasks contains previous LoadPendingTasks, so idToTasks is just used to save all tasks.
// use newLoadingTasks to save new created loading tasks and submit them later.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
Expand Down Expand Up @@ -73,6 +74,8 @@ public abstract class BulkLoadJob extends LoadJob {
// the expr of columns will be reanalyze when the log is replayed
private OriginStatement originStmt;

private UserIdentity userInfo;

// include broker desc and data desc
protected BrokerFileGroupAggInfo fileGroupAggInfo = new BrokerFileGroupAggInfo();
protected List<TabletCommitInfo> commitInfos = Lists.newArrayList();
Expand All @@ -86,10 +89,11 @@ public BulkLoadJob() {
super();
}

public BulkLoadJob(long dbId, String label, OriginStatement originStmt) throws MetaNotFoundException {
public BulkLoadJob(long dbId, String label, OriginStatement originStmt, UserIdentity userInfo) throws MetaNotFoundException {
super(dbId, label);
this.originStmt = originStmt;
this.authorizationInfo = gatherAuthInfo();
this.userInfo = userInfo;

if (ConnectContext.get() != null) {
SessionVariable var = ConnectContext.get().getSessionVariable();
Expand All @@ -113,11 +117,11 @@ public static BulkLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException {
switch (stmt.getEtlJobType()) {
case BROKER:
bulkLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(),
stmt.getBrokerDesc(), stmt.getOrigStmt());
stmt.getBrokerDesc(), stmt.getOrigStmt(), stmt.getUserInfo());
break;
case SPARK:
bulkLoadJob = new SparkLoadJob(db.getId(), stmt.getLabel().getLabelName(),
stmt.getResourceDesc(), stmt.getOrigStmt());
stmt.getResourceDesc(), stmt.getOrigStmt(), stmt.getUserInfo());
break;
case MINI:
case DELETE:
Expand Down Expand Up @@ -296,6 +300,7 @@ public void write(DataOutput out) throws IOException {
super.write(out);
brokerDesc.write(out);
originStmt.write(out);
userInfo.write(out);

out.writeInt(sessionVariables.size());
for (Map.Entry<String, String> entry : sessionVariables.entrySet()) {
Expand Down Expand Up @@ -325,6 +330,11 @@ public void readFields(DataInput in) throws IOException {
// The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName.
// The origin stmt will be analyzed after the replay is completed.

if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_93) {
userInfo = UserIdentity.read(in);
} else {
userInfo = new UserIdentity("","");
}
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_66) {
int size = in.readInt();
for (int i = 0; i < size; i++) {
Expand All @@ -338,4 +348,11 @@ public void readFields(DataInput in) throws IOException {
}
}

public UserIdentity getUserInfo() {
return userInfo;
}

public void setUserInfo(UserIdentity userInfo) {
this.userInfo = userInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.load.loadv2;

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.LoadException;
Expand Down Expand Up @@ -83,9 +84,10 @@ public LoadLoadingTask(Database db, OlapTable table,
this.timeoutS = timeoutS;
}

public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList, int fileNum) throws UserException {
public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList, int fileNum, UserIdentity userInfo) throws UserException {
this.loadId = loadId;
planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, strictMode, timezone, this.timeoutS);
planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table,
brokerDesc, fileGroups, strictMode, timezone, this.timeoutS, userInfo);
planner.plan(loadId, fileStatusList, fileNum);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
Expand All @@ -32,6 +33,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.BrokerScanNode;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.OlapTableSink;
Expand Down Expand Up @@ -65,7 +67,7 @@ public class LoadingTaskPlanner {
private final List<BrokerFileGroup> fileGroups;
private final boolean strictMode;
private final long timeoutS; // timeout of load job, in second

private UserIdentity userInfo;
// Something useful
// ConnectContext here is just a dummy object to avoid some NPE problem, like ctx.getDatabase()
private Analyzer analyzer = new Analyzer(Catalog.getCurrentCatalog(), new ConnectContext());
Expand All @@ -79,7 +81,8 @@ public class LoadingTaskPlanner {

public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
boolean strictMode, String timezone, long timeoutS) {
boolean strictMode, String timezone, long timeoutS,
UserIdentity userInfo) {
this.loadJobId = loadJobId;
this.txnId = txnId;
this.dbId = dbId;
Expand All @@ -89,14 +92,13 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table
this.strictMode = strictMode;
this.analyzer.setTimezone(timezone);
this.timeoutS = timeoutS;

/*
* TODO(cmy): UDF currently belongs to a database. Therefore, before using UDF,
* we need to check whether the user has corresponding permissions on this database.
* But here we have lost user information and therefore cannot check permissions.
* So here we first prohibit users from using UDF in load. If necessary, improve it later.
*/
this.analyzer.setUDFAllowed(false);
this.userInfo = userInfo;
if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(userInfo,
Catalog.getCurrentCatalog().getDb(dbId).getFullName(), PrivPredicate.SELECT)) {
this.analyzer.setUDFAllowed(true);
} else {
this.analyzer.setUDFAllowed(false);
}
}

public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
Expand Down Expand Up @@ -145,9 +146,10 @@ public SparkLoadJob() {
jobType = EtlJobType.SPARK;
}

public SparkLoadJob(long dbId, String label, ResourceDesc resourceDesc, OriginStatement originStmt)
public SparkLoadJob(long dbId, String label, ResourceDesc resourceDesc,
OriginStatement originStmt, UserIdentity userInfo)
throws MetaNotFoundException {
super(dbId, label, originStmt);
super(dbId, label, originStmt, userInfo);
this.resourceDesc = resourceDesc;
timeoutSecond = Config.spark_load_default_timeout_second;
jobType = EtlJobType.SPARK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ private void handleQuery() {
}
parsedStmt = stmts.get(i);
parsedStmt.setOrigStmt(new OriginStatement(originStmt, i));
parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
executor = new StmtExecutor(ctx, parsedStmt);
ctx.setExecutor(executor);
executor.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ public void analyze(TQueryOptions tQueryOptions) throws UserException {
try {
parsedStmt = SqlParserUtils.getStmt(parser, originStmt.idx);
parsedStmt.setOrigStmt(originStmt);
parsedStmt.setUserInfo(context.getCurrentUserIdentity());
} catch (Error e) {
LOG.info("error happened when parsing stmt {}, id: {}", originStmt, context.getStmtId(), e);
throw new AnalysisException("sql parsing error, please check your sql");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.jmockit.Deencapsulation;
Expand All @@ -38,6 +41,9 @@
import org.apache.doris.load.Load;
import org.apache.doris.load.Source;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.planner.BrokerScanNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.transaction.TransactionState;

Expand All @@ -52,12 +58,16 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.UUID;

import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.apache.doris.thrift.TUniqueId;

public class BrokerLoadJobTest {

Expand Down Expand Up @@ -323,6 +333,48 @@ public void testPendingTaskOnFinished(@Injectable BrokerPendingTaskAttachment at
Assert.assertEquals(3, idToTasks.size());
}

@Test
public void testPendingTaskOnFinishedWithUserInfo(@Mocked BrokerPendingTaskAttachment attachment,
@Mocked Catalog catalog,
@Injectable BrokerDesc brokerDesc,
@Injectable LoadTaskCallback callback,
@Injectable Database database,
@Injectable FileGroupAggKey aggKey,
@Mocked OlapTable olapTable,
@Mocked PlanFragment sinkFragment,
@Mocked OlapTableSink olapTableSink,
@Mocked BrokerScanNode scanNode) throws Exception{
List<Column> schema = new ArrayList<>();
schema.add(new Column("a", PrimitiveType.BIGINT));
Map<String, String> properties = new HashMap<>();
properties.put("broker_name", "test");
properties.put("path", "hdfs://www.test.com");
BrokerTable brokerTable = new BrokerTable(123L, "test", schema, properties);
BrokerFileGroup brokerFileGroup = new BrokerFileGroup(brokerTable);
List<Long> partitionIds = new ArrayList<>();
partitionIds.add(123L);
Deencapsulation.setField(brokerFileGroup, "partitionIds", partitionIds);
List<BrokerFileGroup> fileGroups = Lists.newArrayList();
fileGroups.add(brokerFileGroup);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
LoadLoadingTask task = new LoadLoadingTask(database, olapTable,brokerDesc, fileGroups,
100, 100,false, 100, callback, "", 100);
try {
UserIdentity userInfo = new UserIdentity("root", "localhost");
userInfo.setIsAnalyzed();
task.init(loadId,
attachment.getFileStatusByTable(aggKey),
attachment.getFileNumByTable(aggKey),
userInfo);
LoadingTaskPlanner planner = Deencapsulation.getField(task, "planner");
Analyzer al = Deencapsulation.getField(planner, "analyzer");
Assert.assertFalse(al.isUDFAllowed());
} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void testLoadingTaskOnFinishedWithUnfinishedTask(@Injectable BrokerLoadingTaskAttachment attachment,
@Injectable LoadTask loadTask1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
Expand Down Expand Up @@ -217,7 +218,7 @@ public void testExecute(@Mocked Catalog catalog, @Mocked SparkLoadPendingTask pe
};

ResourceDesc resourceDesc = new ResourceDesc(resourceName, Maps.newHashMap());
SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0));
SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
job.execute();

// check transaction id and id to tasks
Expand All @@ -228,7 +229,7 @@ public void testExecute(@Mocked Catalog catalog, @Mocked SparkLoadPendingTask pe
@Test
public void testOnPendingTaskFinished(@Mocked Catalog catalog, @Injectable String originStmt) throws MetaNotFoundException {
ResourceDesc resourceDesc = new ResourceDesc(resourceName, Maps.newHashMap());
SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0));
SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
SparkPendingTaskAttachment attachment = new SparkPendingTaskAttachment(pendingTaskId);
attachment.setAppId(appId);
attachment.setOutputPath(etlOutputPath);
Expand All @@ -247,7 +248,7 @@ private SparkLoadJob getEtlStateJob(String originStmt) throws MetaNotFoundExcept
sparkConfigs.put("spark.master", "yarn");
sparkConfigs.put("spark.submit.deployMode", "cluster");
sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999");
SparkLoadJob job = new SparkLoadJob(dbId, label, null, new OriginStatement(originStmt, 0));
SparkLoadJob job = new SparkLoadJob(dbId, label, null, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
job.state = JobState.ETL;
job.maxFilterRatio = 0.15;
job.transactionId = transactionId;
Expand Down
Loading