add interface to auto clean mysql pendingSegments table#3831
add interface to auto clean mysql pendingSegments table#3831haoxiang47 wants to merge 5 commits intoapache:masterfrom
Conversation
…aSegment.java to struct the task table’s data, add getNotActiveTask and deletePendingSegments in IndexerSQLMetadataStorageCoordinator.java to finish the delete logic, add test in TestIndexerMetadataStorageCoordinator.java.
deletePendingSegment’s sql -> “DELETE from %1s WHERE sequence_name LIKE '%2s%%” and format other code
…aSegment.java to struct the task table’s data, add getNotActiveTask and deletePendingSegments in IndexerSQLMetadataStorageCoordinator.java to finish the delete logic, add test in TestIndexerMetadataStorageCoordinator.java.
deletePendingSegment’s sql -> “DELETE from %1s WHERE sequence_name LIKE '%2s%%” and format other code
|
@KurtYoung can you take a look? |
| @@ -0,0 +1,94 @@ | |||
| package io.druid.timeline; | |||
There was a problem hiding this comment.
@haoxiang47 there are many whitespace changes, can we please revert them?
https://github.com/druid-io/druid/pull/3831/files?w=1 indicates there's only new code added
There was a problem hiding this comment.
| import java.util.Map; | ||
|
|
||
| /** | ||
| * Created by haoxiang on 16/11/11. |
There was a problem hiding this comment.
we generally dont have author info
| this.interval = interval; | ||
| } | ||
|
|
||
| /** |
| nuked.addAll(segments); | ||
| } | ||
|
|
||
| @Override |
| final private Set<DataSegment> published = Sets.newConcurrentHashSet(); | ||
| final private Set<DataSegment> nuked = Sets.newConcurrentHashSet(); | ||
| final private List<DataSegment> unusedSegments; | ||
| final private List<TaskDataSegment> taskDataSegments; |
| * @param interval Filter the tasks to ones that include tasks in this interval exclusively. Start is inclusive, end is exclusive | ||
| * @return The TaskDataSegment list which used to match the pendingSegment table's payload | ||
| */ | ||
| List<TaskDataSegment> getNotActiveTask(final Interval interval); |
| private final Interval interval; | ||
|
|
||
| @JsonCreator | ||
| public TaskDataSegment( |
There was a problem hiding this comment.
Are we sure this class is needed? What is the class used for serde when writing to pending Segments table?
|
@gianm is there any possible race condition where active=0 but we still need the segment? |
|
@fjy Yeah, i will take a look. @haoxiang47 thanks for your work, and i think it's better to keep the code style consistent with current project, it contains too many spaces and format changes, make it hard to see what exactly had been changed. |
|
Yeah thanks for review, I will keep the code style first and update my code, so excepet the code style, if there has any other problem? My logic will simplely like this: compare the druid_tasks and druid_pendingSegments, find the dataset which 'active' is 0 in druid_tasks and match the sequence_name in druid_pendingSegments and baseSequenceName in druid_tasks's payload ioConfig parameter which type is kafka. and then I have the question that where should this interface to action? maybe in overlord or kafka supervisior? |
For the issue 3565, I finished the interface to delete unused pendingSeglments, add TaskDataSegment.java to struct the task table’s data, add getNotActiveTask and deletePendingSegments in IndexerSQLMetadataStorageCoordinator.java to finish the delete logic, add test in TestIndexerMetadataStorageCoordinator.java.