Conversation
|
A flake on this same test happened in the checks for this PR: |
yes this is due to |
|
If the test is slow enough to hit the Should we just use batch append to simplify the test and make it more deterministic (and faster)? |
|
actually it's not due to this |
If that is the case, you could try increasing the You could either just use batch append instead of a Kafka supervisor. FYI, #19151 updates the |
updated to use an index task instead of kafka, PTAL! |
kfaraz
left a comment
There was a problem hiding this comment.
Minor non-blocking suggestions.
| .map(DataSegment::getId) | ||
| .collect(Collectors.toSet()); | ||
|
|
||
| ITRetryUtil.retryUntilEquals( |
There was a problem hiding this comment.
We should wait for a Broker metric instead.
Does cluster.callApi().waitForSegmentsToBeAvailable() not work for this?
| () -> | ||
| broker.bindings() | ||
| .getInstance(BrokerServerView.class) | ||
| .getTimeline(TableDataSource.create(dataSource)) |
There was a problem hiding this comment.
Instead of querying the timeline directly, please use SELECT id FROM sys.segments.
| .map(DataSegment::getId) | ||
| .collect(Collectors.toSet()); | ||
|
|
||
| ITRetryUtil.retryUntilEquals( |
There was a problem hiding this comment.
Instead of ITRetryUtil, try using cluster.callApi().waitForResult().
| kafkaServer.produceRecordsWithoutTransaction(producerRecords); | ||
| } | ||
| return producerRecords.size(); | ||
| final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 500, 100); |
There was a problem hiding this comment.
Is the large number of records crucial for this test?
If not, you could try using some of the templates from MoreResources such as MoreResources.Task.BASIC_INDEX, MoreResources.Task.INDEX_TASK_WITH_AGGREGATORS or MoreResources.MSQ.INSERT_TINY_WIKI_JSON.
There was a problem hiding this comment.
For a large dataset (wikipedia 1 day = 24k rows), you could also try the following (from IngestionSmokeTest.test_runIndexParallelTask_andCompactData())
final String taskId = IdUtils.getRandomId();
final ParallelIndexSupervisorTask task = TaskBuilder
.ofTypeIndexParallel()
.timestampColumn("timestamp")
.jsonInputFormat()
.inputSource(Resources.HttpData.wikipedia1Day())
.dimensions()
.tuningConfig(t -> t.withMaxNumConcurrentSubTasks(1))
.dataSource(dataSource)
.withId(taskId);
cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
cluster.callApi().waitForTaskToSucceed(taskId, eventCollector.latchableEmitter());
fix flaky test:
BrokerServerViewbefore querying for total rowsThis PR has: