Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
08238bc
Support kafka transactional topics
Oct 19, 2018
b584bf5
Remove unused import from test
Oct 20, 2018
cb50ef7
Merge branch 'master' of github.com:druid-io/druid into transactional…
Oct 20, 2018
6ab64f6
Merge branch 'master' of github.com:druid-io/druid into transactional…
Oct 22, 2018
7f1f2ea
Merge branch 'master' of github.com:druid-io/druid into transactional…
Oct 26, 2018
4a94fd6
Merge branch 'master' of github.com:druid-io/druid into transactional…
Nov 13, 2018
d2a132f
Fix compilation
Nov 13, 2018
0ce12e6
Invoke transaction api to fix a unit test
Nov 16, 2018
bad28a0
Merge branch 'master' of github.com:druid-io/druid into transactional…
Nov 16, 2018
868ef80
temporary modification of travis.yml for debugging
Nov 16, 2018
d7c790d
Merge branch 'master' of github.com:druid-io/druid into transactional…
Nov 16, 2018
c1a9ce5
another attempt to get travis tasklogs
Nov 16, 2018
39d946d
update kafka to 2.0.1 at all places
Nov 17, 2018
a41cabf
Merge branch 'master' of github.com:druid-io/druid into transactional…
Nov 17, 2018
7cb28d3
Remove druid-kafka-eight dependency from integration-tests, remove th…
Nov 19, 2018
bd68153
Merge branch 'master' of github.com:druid-io/druid into transactional…
Nov 19, 2018
97cd9ef
Add deprecated in docs for kafka-eight and kafka-simple extensions
Nov 20, 2018
ae91514
Merge branch 'master' of github.com:druid-io/druid into transactional…
Jan 2, 2019
7b72c8d
Remove skipOffsetGaps and code changes for transaction support
Jan 5, 2019
2c6f5c0
Merge branch 'master' of github.com:druid-io/druid into transactional…
Jan 5, 2019
d1f06f0
Fix indentation
Jan 5, 2019
c89ddfb
remove skipOffsetGaps from kinesis
Jan 5, 2019
d6d5b32
Add transaction api to KafkaRecordSupplierTest
Jan 6, 2019
e411d68
Fix indent
Jan 6, 2019
4f16ebe
Merge branch 'master' of github.com:druid-io/druid into transactional…
Jan 30, 2019
9445d92
Fix test
Feb 4, 2019
e5e2326
Merge branch 'master' of github.com:druid-io/druid into transactional…
Feb 4, 2019
581a33a
update kafka version to 2.1.0
Feb 4, 2019
c1a572b
Merge branch 'master' of github.com:druid-io/druid into transactional…
Feb 14, 2019
f632b1e
Merge branch 'master' of github.com:druid-io/druid into transactional…
Feb 14, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ For Roaring bitmaps:
|`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|`skipOffsetGaps`|Boolean|Whether or not to allow gaps of missing offsets in the Kafka stream. This is required for compatibility with implementations such as MapR Streams which does not guarantee consecutive offsets. If this is false, an exception will be thrown if offsets are not consecutive.|no (default == false)|

## Operations

Expand Down
4 changes: 2 additions & 2 deletions docs/content/development/extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Core extensions are maintained by Druid committers.
|druid-datasketches|Support for approximate counts and set operations with [DataSketches](http://datasketches.github.io/).|[link](../development/extensions-core/datasketches-extension.html)|
|druid-hdfs-storage|HDFS deep storage.|[link](../development/extensions-core/hdfs.html)|
|druid-histogram|Approximate histograms and quantiles aggregator.|[link](../development/extensions-core/approximate-histograms.html)|
|druid-kafka-eight|Kafka ingest firehose (high level consumer) for realtime nodes.|[link](../development/extensions-core/kafka-eight-firehose.html)|
|druid-kafka-eight|Kafka ingest firehose (high level consumer) for realtime nodes(deprecated).|[link](../development/extensions-core/kafka-eight-firehose.html)|
|druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)|
|druid-kafka-indexing-service|Supervised exactly-once Kafka ingestion for the indexing service.|[link](../development/extensions-core/kafka-ingestion.html)|
|druid-kinesis-indexing-service|Supervised exactly-once Kinesis ingestion for the indexing service.|[link](../development/extensions-core/kinesis-ingestion.html)|
Expand Down Expand Up @@ -81,7 +81,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|druid-cassandra-storage|Apache Cassandra deep storage.|[link](../development/extensions-contrib/cassandra.html)|
|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)|
|druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)|
|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer).|[link](../development/extensions-contrib/kafka-simple.html)|
|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer)(deprecated).|[link](../development/extensions-contrib/kafka-simple.html)|
|druid-orc-extensions|Support for data in Apache Orc data format.|[link](../development/extensions-contrib/orc.html)|
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|druid-redis-cache|A cache implementation for Druid based on Redis.|[link](../development/extensions-contrib/redis-cache.html)|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.List;

@Deprecated
public class KafkaEightSimpleConsumerDruidModule implements DruidModule
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@Deprecated
public class KafkaEightSimpleConsumerFirehoseFactory implements
FirehoseFactoryV2<ByteBufferInputRowParser>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
* This class is not thread safe, the caller must ensure all the methods be
* called from single thread
*/
@Deprecated
public class KafkaSimpleConsumer
{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

/**
*/
@Deprecated
public class KafkaEightDruidModule implements DruidModule
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import java.util.Set;

/**
* This class is deprecated and kafka-eight module should be removed completely
*/
@Deprecated
public class KafkaEightFirehoseFactory implements FirehoseFactory<InputRowParser<ByteBuffer>>
{
private static final Logger log = new Logger(KafkaEightFirehoseFactory.class);
Expand Down
2 changes: 1 addition & 1 deletion extensions-core/kafka-indexing-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
</parent>

<properties>
<apache.kafka.version>0.10.2.2</apache.kafka.version>
<apache.kafka.version>2.1.0</apache.kafka.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ KafkaConsumer<byte[], byte[]> newConsumer()
props.setProperty("auto.offset.reset", "none");
props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
props.setProperty("isolation.level", "read_committed");

return new KafkaConsumer<>(props);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public KafkaIndexTaskIOConfig(
@JsonProperty("pollTimeout") Long pollTimeout,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime
)
{
super(
Expand All @@ -57,7 +56,6 @@ public KafkaIndexTaskIOConfig(
useTransaction,
minimumMessageTime,
maximumMessageTime,
skipOffsetGaps,
null
);

Expand Down Expand Up @@ -100,7 +98,6 @@ public String toString()
", useTransaction=" + isUseTransaction() +
", minimumMessageTime=" + getMinimumMessageTime() +
", maximumMessageTime=" + getMaximumMessageTime() +
", skipOffsetGaps=" + isSkipOffsetGaps() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import javax.annotation.Nonnull;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -110,7 +111,7 @@ public Set<StreamPartition<Integer>> getAssignment()
public List<OrderedPartitionableRecord<Integer, Long>> poll(long timeout)
{
List<OrderedPartitionableRecord<Integer, Long>> polledRecords = new ArrayList<>();
for (ConsumerRecord<byte[], byte[]> record : consumer.poll(timeout)) {
for (ConsumerRecord<byte[], byte[]> record : consumer.poll(Duration.ofMillis(timeout))) {
polledRecords.add(new OrderedPartitionableRecord<>(
record.topic(),
record.partition(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,23 +413,6 @@ public void run()
}

if (record.offset() < endOffsets.get(record.partition())) {
if (record.offset() != nextOffsets.get(record.partition())) {
if (ioConfig.isSkipOffsetGaps()) {
log.warn(
"Skipped to offset[%,d] after offset[%,d] in partition[%d].",
record.offset(),
nextOffsets.get(record.partition()),
record.partition()
);
} else {
throw new ISE(
"WTF?! Got offset[%,d] after offset[%,d] in partition[%d].",
record.offset(),
nextOffsets.get(record.partition()),
record.partition()
);
}
}

try {
final byte[] valueBytes = record.value();
Expand Down Expand Up @@ -489,7 +472,7 @@ public void run()
nextOffsets.put(record.partition(), record.offset() + 1);
}

if (nextOffsets.get(record.partition()).equals(endOffsets.get(record.partition()))
if (nextOffsets.get(record.partition()) >= (endOffsets.get(record.partition()))
&& assignment.remove(record.partition())) {
log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
KafkaIndexTask.assignPartitions(consumer, topic, assignment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
kafkaIoConfig.getPollTimeout(),
true,
minimumMessageTime,
maximumMessageTime,
kafkaIoConfig.isSkipOffsetGaps()
maximumMessageTime
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig

private final Map<String, Object> consumerProperties;
private final long pollTimeout;
private final boolean skipOffsetGaps;


@JsonCreator
public KafkaSupervisorIOConfig(
Expand All @@ -53,8 +53,7 @@ public KafkaSupervisorIOConfig(
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
@JsonProperty("completionTimeout") Period completionTimeout,
@JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod,
@JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
@JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
@JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod
)
{
super(
Expand All @@ -76,7 +75,6 @@ public KafkaSupervisorIOConfig(
StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY)
);
this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS;
this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : false;
}

@JsonProperty
Expand All @@ -103,12 +101,6 @@ public boolean isUseEarliestOffset()
return isUseEarliestSequenceNumber();
}

@JsonProperty
public boolean isSkipOffsetGaps()
{
return skipOffsetGaps;
}

@Override
public String toString()
{
Expand All @@ -125,7 +117,6 @@ public String toString()
", completionTimeout=" + getCompletionTimeout() +
", earlyMessageRejectionPeriod=" + getEarlyMessageRejectionPeriod() +
", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() +
", skipOffsetGaps=" + skipOffsetGaps +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public void testSerdeWithDefaults() throws Exception
Assert.assertTrue(config.isUseTransaction());
Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent());
Assert.assertFalse("maximumMessageTime", config.getMaximumMessageTime().isPresent());
Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps());
Assert.assertEquals(Collections.EMPTY_SET, config.getExclusiveStartSequenceNumberPartitions());
}

Expand All @@ -93,8 +92,7 @@ public void testSerdeWithNonDefaults() throws Exception
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\",\n"
+ " \"skipOffsetGaps\": true\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";

KafkaIndexTaskIOConfig config = (KafkaIndexTaskIOConfig) mapper.readValue(
Expand All @@ -115,9 +113,7 @@ public void testSerdeWithNonDefaults() throws Exception
Assert.assertFalse(config.isUseTransaction());
Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), config.getMinimumMessageTime().get());
Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), config.getMaximumMessageTime().get());
Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps());
Assert.assertEquals(Collections.EMPTY_SET, config.getExclusiveStartSequenceNumberPartitions());

}

@Test
Expand Down
Loading