Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,20 @@ protected SeekableStreamIndexTaskRunner<Integer, Long> createTaskRunner()
{
if (context != null && context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED) != null
&& ((boolean) context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED))) {
//noinspection unchecked
return new IncrementalPublishingKafkaIndexTaskRunner(
this,
parser,
dataSchema.getParser(),
authorizerMapper,
chatHandlerProvider,
savedParseExceptions,
rowIngestionMetersFactory
);
} else {
//noinspection unchecked
return new LegacyKafkaIndexTaskRunner(
this,
parser,
dataSchema.getParser(),
authorizerMapper,
chatHandlerProvider,
savedParseExceptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@
public class KafkaIndexTaskTest
{
private static final Logger log = new Logger(KafkaIndexTaskTest.class);
private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
private static final ObjectMapper OBJECT_MAPPER = new TestUtils().getTestObjectMapper();
private static final long POLL_RETRY_MS = 100;

private static TestingCluster zkServer;
Expand All @@ -204,6 +204,10 @@ public class KafkaIndexTaskTest
private static ListeningExecutorService taskExec;
private static int topicPostfix;

static {
new KafkaIndexTaskModule().getJacksonModules().forEach(OBJECT_MAPPER::registerModule);
}

private final List<Task> runningTasks = new ArrayList<>();

private long handoffConditionTimeout = 0;
Expand Down Expand Up @@ -244,7 +248,7 @@ public KafkaIndexTaskTest(boolean isIncrementalHandoffSupported)

private static final DataSchema DATA_SCHEMA = new DataSchema(
"test_ds",
objectMapper.convertValue(
OBJECT_MAPPER.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("timestamp", "iso", null),
Expand Down Expand Up @@ -272,7 +276,7 @@ public KafkaIndexTaskTest(boolean isIncrementalHandoffSupported)
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null,
objectMapper
OBJECT_MAPPER
);

private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic)
Expand Down Expand Up @@ -730,10 +734,11 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception
SegmentDescriptor desc7 = sd(task, "2013/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 2L))
),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 2L))
),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);

Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
Assert.assertEquals(
Expand Down Expand Up @@ -2011,7 +2016,7 @@ public void testRunWithPauseAndResume() throws Exception
Assert.assertEquals(2, countEvents(task));
Assert.assertEquals(Status.READING, task.getRunner().getStatus());

Map<Integer, Long> currentOffsets = objectMapper.readValue(
Map<Integer, Long> currentOffsets = OBJECT_MAPPER.readValue(
task.getRunner().pause().getEntity().toString(),
new TypeReference<Map<Integer, Long>>()
{
Expand Down Expand Up @@ -2147,7 +2152,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception
final Map<String, Object> context = new HashMap<>();
context.put(
SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY,
objectMapper.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences)
OBJECT_MAPPER.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences)
);

final KafkaIndexTask task = createTask(
Expand Down Expand Up @@ -2267,7 +2272,7 @@ public void testRunTransactionModeRollback() throws Exception
Assert.assertEquals(Status.READING, task.getRunner().getStatus());

//verify the 2 indexed records
final QuerySegmentSpec firstInterval = objectMapper.readValue(
final QuerySegmentSpec firstInterval = OBJECT_MAPPER.readValue(
"\"2008/2010\"", QuerySegmentSpec.class
);
Iterable<ScanResultValue> scanResultValues = scanData(task, firstInterval);
Expand All @@ -2287,7 +2292,7 @@ public void testRunTransactionModeRollback() throws Exception
Assert.assertEquals(2, countEvents(task));
Assert.assertEquals(Status.READING, task.getRunner().getStatus());

final QuerySegmentSpec rollbackedInterval = objectMapper.readValue(
final QuerySegmentSpec rollbackedInterval = OBJECT_MAPPER.readValue(
"\"2010/2012\"", QuerySegmentSpec.class
);
scanResultValues = scanData(task, rollbackedInterval);
Expand All @@ -2304,7 +2309,7 @@ public void testRunTransactionModeRollback() throws Exception
kafkaProducer.commitTransaction();
}

final QuerySegmentSpec endInterval = objectMapper.readValue(
final QuerySegmentSpec endInterval = OBJECT_MAPPER.readValue(
"\"2008/2049\"", QuerySegmentSpec.class
);
Iterable<ScanResultValue> scanResultValues1 = scanData(task, endInterval);
Expand Down Expand Up @@ -2388,6 +2393,36 @@ public void testCanStartFromLaterThanEarliestOffset() throws Exception
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
}

@Test
public void testSerde() throws Exception
{
// This is both a serde test and a regression test for https://github.com/apache/incubator-druid/issues/7724.

final KafkaIndexTask task = createTask(
"taskid",
DATA_SCHEMA.withTransformSpec(
new TransformSpec(
null,
ImmutableList.of(new ExpressionTransform("beep", "nofunc()", ExprMacroTable.nil()))
)
),
new KafkaIndexTaskIOConfig(
0,
"sequence",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of()),
ImmutableMap.of(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null
)
);

final Task task1 = OBJECT_MAPPER.readValue(OBJECT_MAPPER.writeValueAsBytes(task), Task.class);
Assert.assertEquals(task, task1);
}

private List<ScanResultValue> scanData(final Task task, QuerySegmentSpec spec)
{
ScanQuery query = new Druids.ScanQueryBuilder().dataSource(
Expand Down Expand Up @@ -2513,7 +2548,7 @@ private KafkaIndexTask createTask(
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
final String checkpointsJson = objectMapper
final String checkpointsJson = OBJECT_MAPPER
.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF)
.writeValueAsString(checkpoints);
context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson);
Expand All @@ -2530,7 +2565,7 @@ private KafkaIndexTask createTask(
null,
null,
rowIngestionMetersFactory,
objectMapper
OBJECT_MAPPER
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;
Expand All @@ -2544,7 +2579,7 @@ private static DataSchema cloneDataSchema(final DataSchema dataSchema)
dataSchema.getAggregators(),
dataSchema.getGranularitySpec(),
dataSchema.getTransformSpec(),
objectMapper
OBJECT_MAPPER
);
}

Expand Down Expand Up @@ -2861,7 +2896,7 @@ private SegmentDescriptor sd(final Task task, final String intervalString, final

private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws IOException
{
Map<String, TaskReport> taskReports = objectMapper.readValue(
Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue(
reportsFile,
new TypeReference<Map<String, TaskReport>>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ public KinesisIndexTask(
@Override
protected SeekableStreamIndexTaskRunner<String, String> createTaskRunner()
{
//noinspection unchecked
return new KinesisIndexTaskRunner(
this,
parser,
dataSchema.getParser(),
authorizerMapper,
chatHandlerProvider,
savedParseExceptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
Expand Down Expand Up @@ -57,7 +56,6 @@
import org.apache.druid.utils.CircularBuffer;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Map;


Expand All @@ -67,9 +65,7 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTask.class);

private final SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner;
protected final DataSchema dataSchema;
protected final InputRowParser<ByteBuffer> parser;
protected final SeekableStreamIndexTaskTuningConfig tuningConfig;
protected final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig;
protected final Optional<ChatHandlerProvider> chatHandlerProvider;
Expand All @@ -78,6 +74,12 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
protected final RowIngestionMetersFactory rowIngestionMetersFactory;
protected final CircularBuffer<Throwable> savedParseExceptions;

// Lazily initialized, to avoid calling it on the overlord when tasks are instantiated.
// See https://github.com/apache/incubator-druid/issues/7724 for issues that can cause.
// By the way, lazily init is synchronized because the runner may be needed in multiple threads.
private final Object runnerInitLock = new Object();
private volatile SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner;

public SeekableStreamIndexTask(
final String id,
@Nullable final TaskResource taskResource,
Expand All @@ -99,7 +101,6 @@ public SeekableStreamIndexTask(
context
);
this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
this.parser = Preconditions.checkNotNull((InputRowParser<ByteBuffer>) dataSchema.getParser(), "parser");
this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
Expand All @@ -111,7 +112,6 @@ public SeekableStreamIndexTask(
this.context = context;
this.authorizerMapper = authorizerMapper;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
this.runner = createTaskRunner();
}

private static String makeTaskId(String dataSource, String type)
Expand All @@ -130,7 +130,6 @@ protected static String getFormattedGroupId(String dataSource, String type)
return StringUtils.format("%s_%s", type, dataSource);
}


@Override
public int getPriority()
{
Expand Down Expand Up @@ -164,7 +163,7 @@ public SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> getI
@Override
public TaskStatus run(final TaskToolbox toolbox)
{
return runner.run(toolbox);
return getRunner().run(toolbox);
}

@Override
Expand All @@ -177,19 +176,19 @@ public boolean canRestore()
public void stopGracefully(TaskConfig taskConfig)
{
if (taskConfig.isRestoreTasksOnRestart()) {
runner.stopGracefully();
getRunner().stopGracefully();
}
}

@Override
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
if (runner.getAppenderator() == null) {
if (getRunner().getAppenderator() == null) {
// Not yet initialized, no data yet, just return a noop runner.
return new NoopQueryRunner<>();
}

return (queryPlus, responseContext) -> queryPlus.run(runner.getAppenderator(), responseContext);
return (queryPlus, responseContext) -> queryPlus.run(getRunner().getAppenderator(), responseContext);
}

public Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
Expand Down Expand Up @@ -283,13 +282,20 @@ public boolean withinMinMaxRecordTime(final InputRow row)
@VisibleForTesting
public Appenderator getAppenderator()
{
return runner.getAppenderator();
return getRunner().getAppenderator();
}

@VisibleForTesting
public SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> getRunner()
{
if (runner == null) {
synchronized (runnerInitLock) {
if (runner == null) {
runner = createTaskRunner();
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here I thought that with all java8+ fanciness we would have a method in AtomicReference<T>.getOrCreate(Supplier<T>) but it seems this is still the way for lazy singleton.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guava has Suppliers.memoize that does this, we use it in a couple of places already

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that sounds nice, now I remember seeing that :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


return runner;
}

}