Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions itests/src/test/resources/testconfiguration.properties
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ minillap.query.files=\
binary_output_format.q,\
bucket5.q,\
bucket6.q,\
cmv_direct.q,\
cmv_direct_with_specified_locations.q,\
cmv_direct_with_suffixed_locations.q,\
create_genericudaf.q,\
create_table.q,\
create_udaf.q,\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class CreateMaterializedViewDesc implements DDLDesc, Serializable {
private List<FieldSchema> sortCols;
private List<String> distributeColNames;
private List<FieldSchema> distributeCols;
private Long initialMmWriteId; // Initial MM write ID for CMV and import.
private Long initialWriteId; // Initial write ID for CMV and import.
// The FSOP configuration for the FSOP that is going to write initial data during cmv.
// This is not needed beyond compilation, so it is transient.
private transient FileSinkDesc writer;
Expand Down Expand Up @@ -389,12 +389,12 @@ public Table toTable(HiveConf conf) throws HiveException {
return tbl;
}

public void setInitialMmWriteId(Long mmWriteId) {
this.initialMmWriteId = mmWriteId;
public void setInitialWriteId(Long writeId) {
this.initialWriteId = writeId;
}

public Long getInitialMmWriteId() {
return initialMmWriteId;
public Long getInitialWriteId() {
return initialWriteId;
}

public FileSinkDesc getAndUnsetWriter() {
Expand Down
12 changes: 12 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -5010,4 +5010,16 @@ public static boolean arePathsEqualOrWithin(Path p1, Path p2) {
return ((p1.toString().toLowerCase().indexOf(p2.toString().toLowerCase()) > -1) ||
(p2.toString().toLowerCase().indexOf(p1.toString().toLowerCase()) > -1)) ? true : false;
}

public static String getTableOrMVSuffix(Context context, boolean createTableOrMVUseSuffix) {
String suffix = "";
if (createTableOrMVUseSuffix) {
long txnId = Optional.ofNullable(context)
.map(ctx -> ctx.getHiveTxnManager().getCurrentTxnId()).orElse(0L);
if (txnId != 0) {
suffix = AcidUtils.getPathSuffix(txnId);
}
}
return suffix;
}
}
79 changes: 46 additions & 33 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7610,30 +7610,44 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
destTableIsMaterialization = false;
tableName = HiveTableName.ofNullableWithNoDefault(viewDesc.getViewName());
tblProps = viewDesc.getTblProps();
// Add suffix only when required confs are present
// and user has not specified a location to the table.
createTableUseSuffix = (HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX)
|| HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED))
&& viewDesc.getLocation() == null;
}

destTableIsTransactional = tblProps != null && AcidUtils.isTablePropertyTransactional(tblProps);
if (destTableIsTransactional) {
isNonNativeTable = MetaStoreUtils.isNonNativeTable(tblProps);
boolean isCtas = tblDesc != null && tblDesc.isCTAS();
boolean isCMV = viewDesc != null && qb.isMaterializedView();
isMmTable = isMmCreate = AcidUtils.isInsertOnlyTable(tblProps);
if (!isNonNativeTable && !destTableIsTemporary && isCtas) {
if (!isNonNativeTable && !destTableIsTemporary && (isCtas || isCMV)) {
destTableIsFullAcid = AcidUtils.isFullAcidTable(tblProps);
acidOperation = getAcidType(dest);
isDirectInsert = isDirectInsert(destTableIsFullAcid, acidOperation);
if (isDirectInsert || isMmTable) {
destinationPath = getCtasLocation(tblDesc, createTableUseSuffix);
destinationPath = getCtasOrCMVLocation(tblDesc, viewDesc, createTableUseSuffix);
if (createTableUseSuffix) {
tblDesc.getTblProps().put(SOFT_DELETE_TABLE, Boolean.TRUE.toString());
if (tblDesc != null) {
tblDesc.getTblProps().put(SOFT_DELETE_TABLE, Boolean.TRUE.toString());
} else {
viewDesc.getTblProps().put(SOFT_DELETE_TABLE, Boolean.TRUE.toString());
}
}
// Set the location in context for possible rollback.
ctx.setLocation(destinationPath);
// Setting the location so that metadata transformers
// does not change the location later while creating the table.
tblDesc.setLocation(destinationPath.toString());
if (tblDesc != null) {
tblDesc.setLocation(destinationPath.toString());
} else {
viewDesc.setLocation(destinationPath.toString());
}
} else {
// Set the location in context for possible rollback.
ctx.setLocation(getCtasLocation(tblDesc, createTableUseSuffix));
ctx.setLocation(getCtasOrCMVLocation(tblDesc, viewDesc, createTableUseSuffix));
}
}
try {
Expand All @@ -7645,14 +7659,12 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
} catch (LockException ex) {
throw new SemanticException("Failed to allocate write Id", ex);
}
if (isMmTable) {
if (isMmTable || isDirectInsert) {
if (tblDesc != null) {
tblDesc.setInitialWriteId(writeId);
} else {
viewDesc.setInitialMmWriteId(writeId);
viewDesc.setInitialWriteId(writeId);
}
} else if (isDirectInsert) {
tblDesc.setInitialWriteId(writeId);
}
}

Expand Down Expand Up @@ -7917,17 +7929,13 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
destinationPath, currentTableId, destTableIsFullAcid, destTableIsTemporary,//this was 1/4 acid
destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
canBeMerged, destinationTable, writeId, isMmCreate, destType, qb, isDirectInsert, acidOperation, moveTaskId);
if (isMmCreate) {
if (isMmCreate || (qb.isCTAS() || qb.isMaterializedView()) && isDirectInsert) {
// Add FSD so that the LoadTask compilation could fix up its path to avoid the move.
if (tableDesc != null) {
tableDesc.setWriter(fileSinkDesc);
} else {
createVwDesc.setWriter(fileSinkDesc);
}
} else if (qb.isCTAS() && isDirectInsert) {
if (tableDesc != null) {
tableDesc.setWriter(fileSinkDesc);
}
}

if (fileSinkDesc.getInsertOverwrite()) {
Expand Down Expand Up @@ -7990,19 +7998,28 @@ protected boolean enableColumnStatsCollecting() {
return true;
}

private Path getCtasLocation(CreateTableDesc tblDesc, boolean createTableWithSuffix) throws SemanticException {
private Path getCtasOrCMVLocation(CreateTableDesc tblDesc, CreateMaterializedViewDesc viewDesc,
boolean createTableWithSuffix) throws SemanticException {
Path location;
String[] names;
String protoName = null;
Table tbl;
try {
String protoName = tblDesc.getDbTableName();
String[] names = Utilities.getDbTableName(protoName);

// Handle table translation initially and if not present
// use default table path.
// Property modifications of the table is handled later.
// We are interested in the location if it has changed
// due to table translation.
Table tbl = tblDesc.toTable(conf);
tbl = db.getTranslateTableDryrun(tbl.getTTable());
if (tblDesc != null) {
protoName = tblDesc.getDbTableName();

// Handle table translation initially and if not present
// use default table path.
// Property modifications of the table is handled later.
// We are interested in the location if it has changed
// due to table translation.
tbl = tblDesc.toTable(conf);
tbl = db.getTranslateTableDryrun(tbl.getTTable());
} else {
protoName = viewDesc.getViewName();
tbl = viewDesc.toTable(conf);
}
names = Utilities.getDbTableName(protoName);

Warehouse wh = new Warehouse(conf);
if (tbl.getSd() == null
Expand All @@ -8013,9 +8030,8 @@ private Path getCtasLocation(CreateTableDesc tblDesc, boolean createTableWithSuf
}

if (createTableWithSuffix) {
long txnId = ctx.getHiveTxnManager().getCurrentTxnId();
String suffix = AcidUtils.getPathSuffix(txnId);
location = new Path(location.toString() + suffix);
location = new Path(location.toString() +
Utilities.getTableOrMVSuffix(ctx, createTableWithSuffix));
}

return location;
Expand Down Expand Up @@ -8317,11 +8333,8 @@ private void handleLineage(Table destinationTable, LoadTableDesc ltd, Operator o
Path tlocation = null;
String tName = Utilities.getDbTableName(tableDesc.getDbTableName())[1];
try {
String suffix = "";
if (AcidUtils.isTableSoftDeleteEnabled(destinationTable, conf)) {
long txnId = ctx.getHiveTxnManager().getCurrentTxnId();
suffix = AcidUtils.getPathSuffix(txnId);
}
String suffix = Utilities.getTableOrMVSuffix(ctx,
AcidUtils.isTableSoftDeleteEnabled(destinationTable, conf));
Warehouse wh = new Warehouse(conf);
tlocation = wh.getDefaultTablePath(db.getDatabase(tableDesc.getDatabaseName()),
tName + suffix, tableDesc.isExternal());
Expand Down
23 changes: 5 additions & 18 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
Expand Down Expand Up @@ -473,10 +472,10 @@ private void setLoadFileLocation(
} else {
CreateMaterializedViewDesc cmv = pCtx.getCreateViewDesc();
dataSink = cmv.getAndUnsetWriter();
txnId = cmv.getInitialMmWriteId();
txnId = cmv.getInitialWriteId();
loc = cmv.getLocation();
}
Path location = (loc == null) ? getDefaultCtasLocation(pCtx) : new Path(loc);
Path location = (loc == null) ? getDefaultCtasOrCMVLocation(pCtx) : new Path(loc);
if (pCtx.getQueryProperties().isCTAS()) {
CreateTableDesc ctd = pCtx.getCreateTable();
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.CREATE_TABLE_AS_EXTERNAL)) {
Expand Down Expand Up @@ -511,7 +510,7 @@ private void setLoadFileLocation(
lfd.setTargetDir(location);
}

private Path getDefaultCtasLocation(final ParseContext pCtx) throws SemanticException {
private Path getDefaultCtasOrCMVLocation(final ParseContext pCtx) throws SemanticException {
try {
String protoName = null, suffix = "";
boolean isExternal = false;
Expand All @@ -522,11 +521,11 @@ private Path getDefaultCtasLocation(final ParseContext pCtx) throws SemanticExce
protoName = pCtx.getCreateTable().getDbTableName();
isExternal = pCtx.getCreateTable().isExternal();
createTableOrMVUseSuffix &= AcidUtils.isTransactionalTable(pCtx.getCreateTable());
suffix = getTableOrMVSuffix(pCtx, createTableOrMVUseSuffix);
suffix = Utilities.getTableOrMVSuffix(pCtx.getContext(), createTableOrMVUseSuffix);
} else if (pCtx.getQueryProperties().isMaterializedView()) {
protoName = pCtx.getCreateViewDesc().getViewName();
createTableOrMVUseSuffix &= AcidUtils.isTransactionalView(pCtx.getCreateViewDesc());
suffix = getTableOrMVSuffix(pCtx, createTableOrMVUseSuffix);
suffix = Utilities.getTableOrMVSuffix(pCtx.getContext(), createTableOrMVUseSuffix);
}
String[] names = Utilities.getDbTableName(protoName);
if (!db.databaseExists(names[0])) {
Expand All @@ -539,18 +538,6 @@ private Path getDefaultCtasLocation(final ParseContext pCtx) throws SemanticExce
}
}

public String getTableOrMVSuffix(ParseContext pCtx, boolean createTableOrMVUseSuffix) {
String suffix = "";
if (createTableOrMVUseSuffix) {
long txnId = Optional.ofNullable(pCtx.getContext())
.map(ctx -> ctx.getHiveTxnManager().getCurrentTxnId()).orElse(0L);
if (txnId != 0) {
suffix = AcidUtils.getPathSuffix(txnId);
}
}
return suffix;
}

private void patchUpAfterCTASorMaterializedView(List<Task<?>> rootTasks,
Set<ReadEntity> inputs, Set<WriteEntity> outputs, Task<?> createTask,
boolean createTaskAfterMoveTask) {
Expand Down
52 changes: 52 additions & 0 deletions ql/src/test/queries/clientpositive/cmv_direct.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-- SORT_QUERY_RESULTS
--! qt:dataset:alltypesorc

set hive.support.concurrency=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.acid.direct.insert.enabled=true;

DROP TABLE IF EXISTS source;

DROP MATERIALIZED VIEW IF EXISTS test_orc_cmv;

DROP MATERIALIZED VIEW IF EXISTS test_orc_mmcmv;

DROP MATERIALIZED VIEW IF EXISTS test_parquet_mmcmv;

DROP MATERIALIZED VIEW IF EXISTS test_avro_mmcmv;

DROP MATERIALIZED VIEW IF EXISTS test_textfile_mmcmv;

CREATE TABLE source STORED AS ORC TBLPROPERTIES('transactional'='true') AS (SELECT cint, cfloat, cdouble, cstring1, ctimestamp1 FROM alltypesorc);

CREATE MATERIALIZED VIEW test_orc_cmv STORED AS ORC TBLPROPERTIES('transactional'='true') AS ((SELECT * FROM source WHERE cint > 200 LIMIT 10) UNION (SELECT * FROM source WHERE cint < -100 LIMIT 10));

CREATE MATERIALIZED VIEW test_orc_mmcmv STORED AS ORC TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only') AS ((SELECT * FROM source WHERE cint > 200 LIMIT 10) UNION (SELECT * FROM source WHERE cint < -100 LIMIT 10));

CREATE MATERIALIZED VIEW test_parquet_mmcmv STORED AS PARQUET TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only') AS ((SELECT * FROM source WHERE cint > 200 LIMIT 10) UNION (SELECT * FROM source WHERE cint < -100 LIMIT 10));

CREATE MATERIALIZED VIEW test_avro_mmcmv STORED AS AVRO TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only') AS ((SELECT * FROM source WHERE cint > 200 LIMIT 10) UNION (SELECT * FROM source WHERE cint < -100 LIMIT 10));

CREATE MATERIALIZED VIEW test_textfile_mmcmv STORED AS TEXTFILE TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only') AS ((SELECT * FROM source WHERE cint > 200 LIMIT 10) UNION (SELECT * FROM source WHERE cint < -100 LIMIT 10));

SELECT * FROM test_orc_cmv ORDER BY cint;

SELECT * FROM test_orc_mmcmv ORDER BY cint;

SELECT * FROM test_parquet_mmcmv ORDER BY cint;

SELECT * FROM test_avro_mmcmv ORDER BY cint;

SELECT * FROM test_textfile_mmcmv ORDER BY cint;

DROP MATERIALIZED VIEW IF EXISTS test_orc_cmv;

DROP MATERIALIZED VIEW IF EXISTS test_orc_mmcmv;

DROP MATERIALIZED VIEW IF EXISTS test_parquet_mmcmv;

DROP MATERIALIZED VIEW IF EXISTS test_avro_mmcmv;

DROP MATERIALIZED VIEW IF EXISTS test_textfile_mmcmv;

DROP TABLE IF EXISTS source;
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-- SORT_QUERY_RESULTS
--! qt:dataset:alltypesorc

set hive.support.concurrency=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.acid.direct.insert.enabled=true;

DROP TABLE IF EXISTS source;

DROP MATERIALIZED VIEW IF EXISTS test_orc_cmv;

DROP MATERIALIZED VIEW IF EXISTS test_orc_mmcmv;

DROP MATERIALIZED VIEW IF EXISTS test_parquet_mmcmv;

DROP MATERIALIZED VIEW IF EXISTS test_avro_mmcmv;

DROP MATERIALIZED VIEW IF EXISTS test_textfile_mmcmv;

CREATE TABLE source STORED AS ORC TBLPROPERTIES('transactional'='true') AS (SELECT cint, cfloat, cdouble, cstring1, ctimestamp1 FROM alltypesorc);

CREATE MATERIALIZED VIEW test_orc_cmv STORED AS ORC LOCATION '/build/ql/test/data/warehouse/test_cmv_orc' TBLPROPERTIES('transactional'='true') AS ((SELECT * FROM source WHERE cint > 200 LIMIT 10) UNION (SELECT * FROM source WHERE cint < -100 LIMIT 10));

CREATE MATERIALIZED VIEW test_orc_mmcmv STORED AS ORC LOCATION '/build/ql/test/data/warehouse/test_mmcmv_orc' TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only') AS ((SELECT * FROM source WHERE cint > 200 LIMIT 10) UNION (SELECT * FROM source WHERE cint < -100 LIMIT 10));

CREATE MATERIALIZED VIEW test_parquet_mmcmv STORED AS PARQUET LOCATION '/build/ql/test/data/warehouse/test_mmcmv_parquet' TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only') AS ((SELECT * FROM source WHERE cint > 200 LIMIT 10) UNION (SELECT * FROM source WHERE cint < -100 LIMIT 10));

CREATE MATERIALIZED VIEW test_avro_mmcmv STORED AS AVRO LOCATION '/build/ql/test/data/warehouse/test_mmcmv_avro' TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only') AS ((SELECT * FROM source WHERE cint > 200 LIMIT 10) UNION (SELECT * FROM source WHERE cint < -100 LIMIT 10));

CREATE MATERIALIZED VIEW test_textfile_mmcmv STORED AS TEXTFILE LOCATION '/build/ql/test/data/warehouse/test_mmcmv_textfile' TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only') AS ((SELECT * FROM source WHERE cint > 200 LIMIT 10) UNION (SELECT * FROM source WHERE cint < -100 LIMIT 10));

SELECT * FROM test_orc_cmv ORDER BY cint;

SELECT * FROM test_orc_mmcmv ORDER BY cint;

SELECT * FROM test_parquet_mmcmv ORDER BY cint;

SELECT * FROM test_avro_mmcmv ORDER BY cint;

SELECT * FROM test_textfile_mmcmv ORDER BY cint;

DROP MATERIALIZED VIEW IF EXISTS test_orc_cmv;

DROP MATERIALIZED VIEW IF EXISTS test_orc_mmcmv;

DROP MATERIALIZED VIEW IF EXISTS test_parquet_mmcmv;

DROP MATERIALIZED VIEW IF EXISTS test_avro_mmcmv;

DROP MATERIALIZED VIEW IF EXISTS test_textfile_mmcmv;

DROP TABLE IF EXISTS source;
Loading