Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ public void start() throws InterruptedException
break;
}

if (isServerViewInitialized) {
// lastFailure != 0L means exceptions happened before and there're some refresh work was not completed.
// so that even ServerView is initialized, we can't let broker complete initialization.
if (isServerViewInitialized && lastFailure == 0L) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is saying that we will mark as initialized when all of these are true:

(a) no refresh is needed
(b) the server view is initialized
(c) there wasn't a refresh that just failed

Sounds good to me - thanks for the fix!

Do you think you could add a test for this case too? Maybe something like DruidSchemaTest, but that creates a DruidSchema using a QueryLifecycleFactory where the first query fails fail, and then after that they start succeeding. We'd want to make sure that when awaitInitialization() returns, things are really initialized.

// Server view is initialized, but we don't need to do a refresh. Could happen if there are
// no segments in the system yet. Just mark us as initialized, then.
initialized.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public static void tearDownClass() throws IOException

private SpecificSegmentsQuerySegmentWalker walker = null;
private DruidSchema schema = null;
private DruidSchema schema2 = null;
private SegmentManager segmentManager;
private Set<String> segmentDataSourceNames;
private Set<String> joinableDataSourceNames;
Expand Down Expand Up @@ -269,6 +270,38 @@ protected DruidTable buildDruidTable(String dataSource)
}
};

schema2 = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
segmentManager,
new MapJoinableFactory(ImmutableSet.of(globalTableJoinable), ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()
)
{

boolean throwException = true;
@Override
protected DruidTable buildDruidTable(String dataSource)
{
DruidTable table = super.buildDruidTable(dataSource);
buildTableLatch.countDown();
return table;
}

@Override
Set<SegmentId> refreshSegments(final Set<SegmentId> segments) throws IOException
{
if (throwException) {
throwException = false;
throw new RuntimeException("Query[xxxx] url[http://xxxx:8083/druid/v2/] timed out.");
} else {
return super.refreshSegments(segments);
}
}
};

schema.start();
schema.awaitInitialization();
}
Expand All @@ -289,6 +322,19 @@ public void testGetTableMap()
Assert.assertEquals(ImmutableSet.of("foo", "foo2"), tableMap.keySet());
}

@Test
public void testSchemaInit() throws InterruptedException
{
schema2.start();
schema2.awaitInitialization();
Map<String, Table> tableMap = schema2.getTableMap();
Assert.assertEquals(2, tableMap.size());
Assert.assertTrue(tableMap.containsKey("foo"));
Assert.assertTrue(tableMap.containsKey("foo2"));
schema2.stop();
}


@Test
public void testGetTableMapFoo()
{
Expand Down