Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ public void testCheckSegments() throws IOException
1024
)
);
indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments);
indexerMetadataStorageCoordinator.announceHistoricalSegments(derivativeSegments);
indexerMetadataStorageCoordinator.commitSegments(baseSegments);
indexerMetadataStorageCoordinator.commitSegments(derivativeSegments);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
Expand Down Expand Up @@ -252,7 +252,7 @@ public void testCheckSegmentsAndSubmitTasks() throws IOException
1024
)
);
indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments);
indexerMetadataStorageCoordinator.commitSegments(baseSegments);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void testOptimize() throws InterruptedException
1024 * 1024
);
try {
metadataStorageCoordinator.announceHistoricalSegments(Sets.newHashSet(segment));
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment));
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
}
catch (IOException e) {
Expand All @@ -185,7 +185,7 @@ public void testOptimize() throws InterruptedException
1024
);
try {
metadataStorageCoordinator.announceHistoricalSegments(Sets.newHashSet(segment));
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment));
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,7 @@ public void testIsTransientException()
SQLServerConnector connector = new SQLServerConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(
new MetadataStorageTablesConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
MetadataStorageTablesConfig.fromBase(null)
)
);

Expand All @@ -70,7 +58,7 @@ public void testLimitClause()
SQLServerConnector connector = new SQLServerConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(
new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null)
MetadataStorageTablesConfig.fromBase(null)
)
);
Assert.assertEquals("FETCH NEXT 100 ROWS ONLY", connector.limitClause(100));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public String getDriverClassName()
private static final Supplier<MetadataStorageConnectorConfig> CONNECTOR_CONFIG_SUPPLIER =
MetadataStorageConnectorConfig::new;
private static final Supplier<MetadataStorageTablesConfig> TABLES_CONFIG_SUPPLIER =
() -> new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null);
() -> MetadataStorageTablesConfig.fromBase(null);


@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,7 @@ public void testIsTransientException()
{
PostgreSQLConnector connector = new PostgreSQLConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(
new MetadataStorageTablesConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
),
Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(null)),
new PostgreSQLConnectorConfig(),
new PostgreSQLTablesConfig()
);
Expand All @@ -68,9 +54,7 @@ public void testLimitClause()
{
PostgreSQLConnector connector = new PostgreSQLConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(
new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null)
),
Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(null)),
new PostgreSQLConnectorConfig(),
new PostgreSQLTablesConfig()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public TypeReference<Set<DataSegment>> getReturnTypeReference()

/**
* Behaves similarly to
* {@link org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#announceHistoricalSegments},
* {@link org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#commitSegments},
* with startMetadata and endMetadata both null.
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Void perform(Task task, TaskActionToolbox toolbox)
try {
toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.builder()
.onValidLocks(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public Void perform(Task task, TaskActionToolbox toolbox)
try {
toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.builder()
.onValidLocks(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
task,
allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.<SegmentPublishResult>builder()
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(
() -> toolbox.getIndexerMetadataStorageCoordinator().commitSegmentsAndMetadata(
segments,
startMetadata,
endMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class LockRequestForNewSegment implements LockRequest
private final int priority;
private final String sequenceName;
@Nullable
private final String previsousSegmentId;
private final String previousSegmentId;
private final boolean skipSegmentLineageCheck;

private String version;
Expand All @@ -55,7 +55,7 @@ public LockRequestForNewSegment(
PartialShardSpec partialShardSpec,
int priority,
String sequenceName,
@Nullable String previsousSegmentId,
@Nullable String previousSegmentId,
boolean skipSegmentLineageCheck
)
{
Expand All @@ -67,7 +67,7 @@ public LockRequestForNewSegment(
this.partialShardSpec = partialShardSpec;
this.priority = priority;
this.sequenceName = sequenceName;
this.previsousSegmentId = previsousSegmentId;
this.previousSegmentId = previousSegmentId;
this.skipSegmentLineageCheck = skipSegmentLineageCheck;
}

Expand All @@ -79,7 +79,7 @@ public LockRequestForNewSegment(
Interval interval,
PartialShardSpec partialShardSpec,
String sequenceName,
@Nullable String previsousSegmentId,
@Nullable String previousSegmentId,
boolean skipSegmentLineageCheck
)
{
Expand All @@ -92,7 +92,7 @@ public LockRequestForNewSegment(
partialShardSpec,
task.getPriority(),
sequenceName,
previsousSegmentId,
previousSegmentId,
skipSegmentLineageCheck
);
}
Expand Down Expand Up @@ -168,9 +168,9 @@ public String getSequenceName()
}

@Nullable
public String getPrevisousSegmentId()
public String getPreviousSegmentId()
{
return previsousSegmentId;
return previousSegmentId;
}

public boolean isSkipSegmentLineageCheck()
Expand All @@ -190,7 +190,7 @@ public String toString()
", partialShardSpec=" + partialShardSpec +
", priority=" + priority +
", sequenceName='" + sequenceName + '\'' +
", previsousSegmentId='" + previsousSegmentId + '\'' +
", previousSegmentId='" + previousSegmentId + '\'' +
", skipSegmentLineageCheck=" + skipSegmentLineageCheck +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment reques
return metadataStorageCoordinator.allocatePendingSegment(
request.getDataSource(),
request.getSequenceName(),
request.getPrevisousSegmentId(),
request.getPreviousSegmentId(),
request.getInterval(),
request.getPartialShardSpec(),
version,
Expand All @@ -773,7 +773,7 @@ private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment reques
* @param intervals intervals
* @param action action to be performed inside of the critical section
*/
public <T> T doInCriticalSection(Task task, List<Interval> intervals, CriticalAction<T> action) throws Exception
public <T> T doInCriticalSection(Task task, Set<Interval> intervals, CriticalAction<T> action) throws Exception
{
giant.lock();

Expand All @@ -790,7 +790,7 @@ public <T> T doInCriticalSection(Task task, List<Interval> intervals, CriticalAc
* It doesn't check other semantics like acquired locks are enough to overwrite existing segments.
* This kind of semantic should be checked in each caller of {@link #doInCriticalSection}.
*/
private boolean isTaskLocksValid(Task task, List<Interval> intervals)
private boolean isTaskLocksValid(Task task, Set<Interval> intervals)
{
giant.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void setup() throws IOException
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), "1"));

actionTestKit.getMetadataStorageCoordinator()
.announceHistoricalSegments(expectedUnusedSegments);
.commitSegments(expectedUnusedSegments);

expectedUnusedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval()));

Expand All @@ -70,7 +70,7 @@ public void setup() throws IOException
expectedUsedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), "2"));

actionTestKit.getMetadataStorageCoordinator()
.announceHistoricalSegments(expectedUsedSegments);
.commitSegments(expectedUsedSegments);

expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ public void testAddToExistingLinearShardSpecsSameGranularity() throws Exception
{
final Task task = NoopTask.create();

taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
Expand Down Expand Up @@ -639,7 +639,7 @@ public void testAddToExistingNumberedShardSpecsSameGranularity() throws Exceptio
{
final Task task = NoopTask.create();

taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
Expand Down Expand Up @@ -702,7 +702,7 @@ public void testAddToExistingNumberedShardSpecsCoarserPreferredGranularity() thr
{
final Task task = NoopTask.create();

taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
Expand Down Expand Up @@ -741,7 +741,7 @@ public void testAddToExistingNumberedShardSpecsFinerPreferredGranularity() throw
{
final Task task = NoopTask.create();

taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
Expand Down Expand Up @@ -780,7 +780,7 @@ public void testCannotAddToExistingNumberedShardSpecsWithCoarserQueryGranularity
{
final Task task = NoopTask.create();

taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
Expand Down Expand Up @@ -825,7 +825,7 @@ public void testWithPartialShardSpecAndOvershadowingSegments() throws IOExceptio

final ObjectMapper objectMapper = new DefaultObjectMapper();

taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testSimple() throws Exception
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
actionTestKit.getTaskLockbox().doInCriticalSection(
task,
Collections.singletonList(INTERVAL),
Collections.singleton(INTERVAL),
CriticalAction.builder()
.onValidLocks(() -> action.perform(task, actionTestKit.getTaskActionToolbox()))
.onInvalidLocks(
Expand Down Expand Up @@ -137,7 +137,7 @@ public void testFailBadVersion() throws Exception
thrown.expectMessage(CoreMatchers.containsString("are not covered by locks"));
final Set<DataSegment> segments = actionTestKit.getTaskLockbox().doInCriticalSection(
task,
Collections.singletonList(INTERVAL),
Collections.singleton(INTERVAL),
CriticalAction.<Set<DataSegment>>builder()
.onValidLocks(() -> action.perform(task, actionTestKit.getTaskActionToolbox()))
.onInvalidLocks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1507,9 +1507,9 @@ private void makeToolboxFactory(final File directory)
)
{
@Override
public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments) throws IOException
public Set<DataSegment> commitSegments(Set<DataSegment> segments) throws IOException
{
Set<DataSegment> result = super.announceHistoricalSegments(segments);
Set<DataSegment> result = super.commitSegments(segments);

Assert.assertFalse(
"Segment latch not initialized, did you forget to call expectPublishSegments?",
Expand All @@ -1523,13 +1523,13 @@ public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments) th
}

@Override
public SegmentPublishResult announceHistoricalSegments(
public SegmentPublishResult commitSegmentsAndMetadata(
Set<DataSegment> segments,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
) throws IOException
{
SegmentPublishResult result = super.announceHistoricalSegments(segments, startMetadata, endMetadata);
SegmentPublishResult result = super.commitSegmentsAndMetadata(segments, startMetadata, endMetadata);

Assert.assertNotNull(
"Segment latch not initialized, did you forget to call expectPublishSegments?",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testKill() throws Exception
newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);
final Set<DataSegment> announced = getMetadataStorageCoordinator().commitSegments(segments);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice to rename announced to committed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure 👍🏻
If CI fails for any reason, I will include this change. Otherwise, I will include it in the other PR.


Assert.assertEquals(segments, announced);

Expand Down Expand Up @@ -114,7 +114,7 @@ public void testKillWithMarkUnused() throws Exception
newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);
final Set<DataSegment> announced = getMetadataStorageCoordinator().commitSegments(segments);

Assert.assertEquals(segments, announced);

Expand Down Expand Up @@ -178,7 +178,7 @@ public void testKillBatchSizeOneAndLimit4() throws Exception
newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);
final Set<DataSegment> announced = getMetadataStorageCoordinator().commitSegments(segments);

Assert.assertEquals(segments, announced);

Expand Down Expand Up @@ -223,7 +223,7 @@ public void testKillBatchSizeThree() throws Exception
newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);
final Set<DataSegment> announced = getMetadataStorageCoordinator().commitSegments(segments);

Assert.assertEquals(segments, announced);

Expand Down
Loading