-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Change the relationship between txn and task #703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change the relationship between txn and task #703
Conversation
EmmyMiao87
commented
Mar 7, 2019
- Check if properties is null before check routine load properties
- Change transactionStateChange reason to string
- calculate current num by beId
- Add kafka offset properties
- Perfer to use previous be id
- Add before commit listerner of txn: if txn is committed after task is aborted, commit will be aborted
- queryId of stream load plan = taskId
1. Check if properties is null before check routine load properties 2. Change transactionStateChange reason to string 3. calculate current num by beId 4. Add kafka offset properties 5. Perfer to use previous be id 6. Add before commit listerner of txn: if txn is committed after task is aborted, commit will be aborted 7. queryId of stream load plan = taskId
6c54104 to
08d3207
Compare
| kafkaOffsets = new ArrayList<>(); | ||
| String[] kafkaOffsetsStringList = customProperties.get(KAFKA_OFFSETS_PROPERTY).split(","); | ||
| for (String s : kafkaOffsetsStringList) { | ||
| kafkaOffsets.add(Long.valueOf(s)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should check the number format. And also check whether the number of specified offsets equals to number of specified partitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will do it later
| KafkaProgress newProgress = (KafkaProgress) progress; | ||
| newProgress.getPartitionIdToOffset().entrySet().parallelStream() | ||
| .forEach(entity -> partitionIdToOffset.put(entity.getKey(), entity.getValue())); | ||
| .forEach(entity -> partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why + 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The offset of txn is the end offset while the kafka offset of progress is begin offset . The next task will read data from this begin offset.
| } | ||
| String taskId = txnState.getLabel(); | ||
| if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> entity.getId().toString().equals(taskId))) { | ||
| LOG.debug("there are a txn of routine load task will be aborted"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"there are a txn" ??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And better to add taskid in log
1. Check if properties is null before check routine load properties 2. Change transactionStateChange reason to string 3. calculate current num by beId 4. Add kafka offset properties 5. Prefer to use previous be id 6. Add before commit listener of txn: if txn is committed after task is aborted, commit will be aborted 7. queryId of stream load plan = taskId
1. Check if properties is null before check routine load properties 2. Change transactionStateChange reason to string 3. calculate current num by beId 4. Add kafka offset properties 5. Prefer to use previous be id 6. Add before commit listener of txn: if txn is committed after task is aborted, commit will be aborted 7. queryId of stream load plan = taskId
1. Check if properties is null before check routine load properties 2. Change transactionStateChange reason to string 3. calculate current num by beId 4. Add kafka offset properties 5. Prefer to use previous be id 6. Add before commit listener of txn: if txn is committed after task is aborted, commit will be aborted 7. queryId of stream load plan = taskId
1. Check if properties is null before check routine load properties 2. Change transactionStateChange reason to string 3. calculate current num by beId 4. Add kafka offset properties 5. Prefer to use previous be id 6. Add before commit listener of txn: if txn is committed after task is aborted, commit will be aborted 7. queryId of stream load plan = taskId
1. Check if properties is null before check routine load properties 2. Change transactionStateChange reason to string 3. calculate current num by beId 4. Add kafka offset properties 5. Prefer to use previous be id 6. Add before commit listener of txn: if txn is committed after task is aborted, commit will be aborted 7. queryId of stream load plan = taskId
1. Check if properties is null before check routine load properties 2. Change transactionStateChange reason to string 3. calculate current num by beId 4. Add kafka offset properties 5. Prefer to use previous be id 6. Add before commit listener of txn: if txn is committed after task is aborted, commit will be aborted 7. queryId of stream load plan = taskId
1. Check if properties is null before check routine load properties 2. Change transactionStateChange reason to string 3. calculate current num by beId 4. Add kafka offset properties 5. Prefer to use previous be id 6. Add before commit listener of txn: if txn is committed after task is aborted, commit will be aborted 7. queryId of stream load plan = taskId
1. Check if properties is null before check routine load properties 2. Change transactionStateChange reason to string 3. calculate current num by beId 4. Add kafka offset properties 5. Prefer to use previous be id 6. Add before commit listener of txn: if txn is committed after task is aborted, commit will be aborted 7. queryId of stream load plan = taskId