-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Refactor alter job #1695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor alter job #1695
Conversation
3edd147 to
56478b5
Compare
| exec_mem_limit: 设置导入使用的内存上限。默认为2G,单位字节。这里是指单个 BE 节点的内存上限。 | ||
| 一个导入可能分布于多个BE。我们假设 1GB 数据在单个节点处理需要最大5GB内存。那么假设1GB文件分布在2个节点处理,那么理论上,每个节点需要内存为2.5GB。则该参数可以设置为 2684354560,即2.5GB | ||
| strict mode: 是否对数据进行严格限制。默认为true。 | ||
| strict mode: 是否对数据进行严格限制。默认为true。 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
对齐错了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| stmt.getBrokerDesc(), originStmt); | ||
| brokerLoadJob.setJobProperties(stmt.getProperties()); | ||
| brokerLoadJob.setDataSourceInfo(db, stmt.getDataDescriptions()); | ||
| brokerLoadJob.checkAndDataSourceInfo(db, stmt.getDataDescriptions()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkAnd ?? Maybe checkAndCreate is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to checkAndSetDataSourceInfo
| txnState.addTableIndexes(table); | ||
| } | ||
| // submit all tasks together | ||
| for (LoadTask loadTask : idToTasks.values()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some finished task in idToTasks which should not be submitted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| private boolean isNegative; | ||
| private List<Long> partitionIds; | ||
| // this is a compatible param which only happens before the function of broker has been supported. | ||
| private List<String> fileFieldNames; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is fileFieldNames used by other class ?
10894a2 to
db504f5
Compare
be/src/olap/schema_change.cpp
Outdated
|
|
||
| // _validate_alter_result should be outside the above while loop. | ||
| // to avoid requiring the header lock twice. | ||
| res = _validate_alter_result(new_tablet, request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the previous step failed, this step will override the result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a res check before this
| } | ||
| int rollupSchemaHash = Util.schemaHash(schemaVersion, rollupSchema, olapTable.getCopiedBfColumns(), | ||
| // get rollup schema hash | ||
| int rollupSchemaHash = Util.schemaHash(0 /* init schema version */, rollupSchema, olapTable.getCopiedBfColumns(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not use schema hash 0, because we do not know whether be is used this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this 0 is schema version, not schema hash. schema hash is calculated inside this function
| rollingUpIndex = rollupJob.getRollupIndex(partition.getId()); | ||
|
|
||
| List<MaterializedIndex> allIndices = null; | ||
| if (transactionState.getLoadedTblIndexes().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only stream load pu loaded indices, but dpp load does not, so that dpp load will fail during schema change process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| if (entry.getKey() <= endTransactionId) { | ||
| LOG.debug("find a running txn with txn_id={}, less than schema change txn_id {}", | ||
| entry.getKey(), endTransactionId); | ||
| LOG.info("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should check at tablet level not at db level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is already too big, so I will improve this at another PR. I will create an issue for it #1724
yiguolei
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments
2b52e67 to
4effda0
Compare
| // eg: | ||
| // base schema is (A, B, C), and B is under schema change, so there will be a shadow column: '__doris_shadow_B' | ||
| // So the final column mapping should looks like: (A, B, C, __doris_shadow_B = B); | ||
| List<Column> fullSchema = dstTable.getFullSchema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may be problem when input column mapping is (tmp_a, tmp_b, tmp_c, a = tmp_a, b = tmp_b, c = tmp_c). It's better to add shadow column after all exprs are analyzed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed, __shadow_column will be mapped to what its base column mapping.
In your example, is will be __shadow_column = tmp_x
| this.txnCommitAttachment = txnCommitAttachment; | ||
| } | ||
|
|
||
| public void addTableIndexes(OlapTable table) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this function can be called by multi threads? If so you should make this function thread-safe, otherwise you should add some comments for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will only be called before transaction running. Only on thread can access it.
I will add comment here
| return columnExprDescs; | ||
| } | ||
|
|
||
| public void addColumnExprDesc(ImportColumnDesc columnExprDesc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to comment who will use this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is removed
|
|
||
| if (properties.containsKey(LoadStmt.TIMEZONE)) { | ||
| timezone = properties.get(LoadStmt.TIMEZONE); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can get it from session variables if properties don't have
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| super(JobType.ROLLUP); | ||
| } | ||
|
|
||
| public void addTabletIdMap(long partitionId, long rollupTabletId, long baseTabletId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it need thread safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need, it only be accessed when job is creating.
| * For hadoop load, this param is also used to persistence. | ||
| * The function in this param is copied from 'parsedColumnExprList' | ||
| */ | ||
| private Map<String, Pair<String, List<String>>> columnToHadoopFunction = Maps.newHashMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private Map<String, Pair<String, List<String>>> columnToHadoopFunction = Maps.newHashMap(); | |
| private Map<String, Pair<String, List<String>>> columnToHadoopFunction = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
38d255a to
226548c
Compare
| "default_value", | ||
| "md5sum", | ||
| "replace_value", | ||
| "now", "hll_hash", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
per function per line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
|
|
||
| TBrokerScanRangeParams params = context.params; | ||
| // there are no columns transform | ||
| List<String> sourceFileColumns = context.fileGroup.getFileFieldNames(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you analyze the sourceFileColumns and pathColumns in DataDescription ? The originColumnNameToExprList is the finally columns expr including source file columns, path columns, and column expr.
| } | ||
| List<Column> baseSchema = table.getBaseSchema(); | ||
| // fill the column info if user does not specify them | ||
| dataDescription.fillColumnInfoIfNotSpecified(baseSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The columns and parsedColumnExprList include the same columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, In fillColumnInfoIfNotSpecified(), I just want to fill the fields which user did not fill. After that, these fields should look like user fill them, not some "after-analyzed" results.
fef75e6 to
80a1174
Compare
158a3ca to
f7c34bd
Compare
8165d50 to
644d159
Compare
| slotDescByName.put(column.getName(), slotDesc); | ||
| } | ||
| } | ||
| boolean specifyFileFieldNames = streamLoadTask.getColumnExprDescs().stream().anyMatch(p -> p.isColumn()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not put this in Load.initColumns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
| // base schema is (A, B, C), and B is under schema change, so there will be a shadow column: '__doris_shadow_B' | ||
| // So the final column mapping should looks like: (A, B, C, __doris_shadow_B = substitute(B)); | ||
| for (Column column : tbl.getFullSchema()) { | ||
| if (column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) { | |
| if (!column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) { | |
| continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
| import java.util.List; | ||
|
|
||
| /* | ||
| * Author: Chenmingyu |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove Author tag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
imay
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
ISSUE #1613