Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5d3ca71
KAFKA-5295: Add tests first
kkonstantine May 5, 2020
1bab224
KAFKA-5295: Add configuration for topic creation by source connectors
kkonstantine May 9, 2020
cff6302
KAFKA-5295: Adjust tests to new source connector configs
kkonstantine May 9, 2020
bf250eb
KAFKA-5295: Implement topic creation by tasks based on configuration …
kkonstantine May 11, 2020
11d9e64
KAFKA-5295: Adapt existing tests
kkonstantine May 12, 2020
3e25d6d
KAFKA-5295: Adapt existing integration tests
kkonstantine May 22, 2020
05b48eb
KAFKA-5295: Adjust checkstyle
kkonstantine May 14, 2020
91b6d4e
KAFKA-5295: Tests with topic creation enabled
kkonstantine May 14, 2020
435ca34
KAFKA-5295: New tests
kkonstantine May 22, 2020
800a40f
KAFKA-5295: Keep connectors that don't use topic creation unaffected
kkonstantine May 23, 2020
1e98c70
KAFKA-5295: Upgrade integration test
kkonstantine May 23, 2020
f58be22
KAFKA-5295: Minor cleanup
kkonstantine May 24, 2020
958cbe5
KAFKA-5295: Remove redundant code
kkonstantine May 24, 2020
0f90de4
KAFKA-5295: Maximize coverage for WorkerSourceTask
kkonstantine May 24, 2020
4840a6b
KAFKA-5295: Unit tests for SourceConnectorConfig
kkonstantine May 24, 2020
98f7376
KAFKA-5295: Address Randall's comments
kkonstantine May 25, 2020
d5b6d26
KAFKA-5295: Adapt tests after the changes of KAFKA-4794/KIP-131
kkonstantine May 25, 2020
9fa3e09
Fix doc in WorkerConfig
kkonstantine May 25, 2020
e73395b
KAFKA-5295: Set the id for the admin client used by the DLQ
kkonstantine May 25, 2020
645e2b2
KAFKA-5295: Refactor TopicCreation under the util package
kkonstantine May 25, 2020
b8da7c5
KAFKA-5295: Refactor TopicCreationGroup under its own class in util p…
kkonstantine May 25, 2020
a880d6e
KAFKA-5295: Add isTopicCreationRequired and addTopic in TopicCreation…
kkonstantine May 26, 2020
79fdf99
KAFKA-5295: Refactor and add TopicCreation unit tests
kkonstantine May 26, 2020
f57a9d6
Update REPLICATION_FACTOR_DOC
kkonstantine May 26, 2020
e61a8a4
Apply doc suggestions from code review
kkonstantine May 26, 2020
540cd0c
KAFKA-5295: Address minor comments from code review
kkonstantine May 26, 2020
83ac913
KAFKA-5295: Fix build and last minor typos
kkonstantine May 26, 2020
9850535
KAFKA-5295: Fix test after change in validation error
kkonstantine May 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 17 additions & 31 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,13 @@

<!-- Connect -->
<suppress checks="ClassFanOutComplexity"
files="DistributedHerder(|Test).java"/>
<suppress checks="ClassFanOutComplexity"
files="Worker.java"/>
files="(DistributedHerder|Worker).java"/>

<suppress checks="MethodLength"
files="(KafkaConfigBackingStore|RequestResponseTest|WorkerSinkTaskTest).java"/>
files="(KafkaConfigBackingStore|Values).java"/>

<suppress checks="ParameterNumber"
files="(WorkerSinkTask|WorkerSourceTask).java"/>
<suppress checks="ParameterNumber"
files="WorkerCoordinator.java"/>
files="Worker(SinkTask|SourceTask|Coordinator).java"/>
<suppress checks="ParameterNumber"
files="ConfigKeyInfo.java"/>

Expand All @@ -119,40 +116,27 @@
files="JsonConverter.java"/>

<suppress checks="CyclomaticComplexity"
files="ConnectRecord.java"/>
<suppress checks="CyclomaticComplexity"
files="JsonConverter.java"/>
<suppress checks="CyclomaticComplexity"
files="FileStreamSourceTask.java"/>
<suppress checks="CyclomaticComplexity"
files="DistributedHerder.java"/>
<suppress checks="CyclomaticComplexity"
files="KafkaConfigBackingStore.java"/>
files="(FileStreamSourceTask|DistributedHerder|KafkaConfigBackingStore).java"/>
<suppress checks="CyclomaticComplexity"
files="(Values|ConnectHeader|ConnectHeaders).java"/>
<suppress checks="CyclomaticComplexity"
files="RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java"/>
files="(ConnectRecord|JsonConverter|Values|ConnectHeader|ConnectHeaders).java"/>

<suppress checks="JavaNCSS"
files="KafkaConfigBackingStore.java"/>
<suppress checks="JavaNCSS"
files="Values.java"/>

<suppress checks="NPathComplexity"
files="(DistributedHerder|RestClient|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|TopicAdmin).java"/>

<suppress checks="MethodLength"
files="Values.java"/>
files="(KafkaConfigBackingStore|Values).java"/>

<suppress checks="NPathComplexity"
files="RestServer.java"/>
files="(DistributedHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|TopicAdmin).java"/>

<!-- connect tests-->
<suppress checks="ClassDataAbstractionCoupling"
files="(DistributedHerder|KafkaBasedLog)Test.java"/>
files="(DistributedHerder|KafkaBasedLog|WorkerSourceTaskWithTopicCreation)Test.java"/>

<suppress checks="ClassFanOutComplexity"
files="(WorkerSinkTask|WorkerSourceTask)Test.java"/>
files="(WorkerSink|WorkerSource|ErrorHandling)Task(|WithTopicCreation)Test.java"/>
<suppress checks="ClassFanOutComplexity"
files="DistributedHerderTest.java"/>

<suppress checks="MethodLength"
files="(RequestResponse|WorkerSinkTask)Test.java"/>

<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
Expand Down Expand Up @@ -225,6 +209,8 @@
files="KStreamKStreamJoinTest.java|KTableKTableForeignKeyJoinIntegrationTest.java"/>
<suppress checks="CyclomaticComplexity"
files="RelationalSmokeTest.java|SmokeTestDriver.java"/>
<suppress checks="CyclomaticComplexity"
files="RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java"/>

<suppress checks="JavaNCSS"
files="KStreamKStreamJoinTest.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,167 @@
*/
package org.apache.kafka.connect.runtime;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.runtime.isolation.Plugins;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;

public class SourceConnectorConfig extends ConnectorConfig {

private static ConfigDef config = ConnectorConfig.configDef();
protected static final String TOPIC_CREATION_GROUP = "Topic Creation";

public static final String TOPIC_CREATION_PREFIX = "topic.creation.";

public static final String TOPIC_CREATION_GROUPS_CONFIG = TOPIC_CREATION_PREFIX + "groups";
private static final String TOPIC_CREATION_GROUPS_DOC = "Groups of configurations for topics "
+ "created by source connectors";
private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic Creation Groups";

private static class EnrichedSourceConnectorConfig extends AbstractConfig {
EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> props) {
super(configDef, props);
}

@Override
public Object get(String key) {
return super.get(key);
}
}

private static ConfigDef config = SourceConnectorConfig.configDef();
private final EnrichedSourceConnectorConfig enrichedSourceConfig;

public static ConfigDef configDef() {
return config;
int orderInGroup = 0;
return new ConfigDef(ConnectorConfig.configDef())
.define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
(name, value) -> {
List<?> groupAliases = (List<?>) value;
if (groupAliases.size() > new HashSet<>(groupAliases).size()) {
throw new ConfigException(name, value, "Duplicate alias provided.");
}
},
() -> "unique topic creation groups")),
ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, TOPIC_CREATION_GROUP,
++orderInGroup, ConfigDef.Width.LONG, TOPIC_CREATION_GROUPS_DISPLAY);
}

public SourceConnectorConfig(Plugins plugins, Map<String, String> props) {
public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
String defaultGroup = "default";
ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
newDefaultDef.embed(DEFAULT_TOPIC_CREATION_PREFIX, defaultGroup, 0, TopicCreationConfig.defaultGroupConfigDef());
return newDefaultDef;
}
Comment thread
rhauch marked this conversation as resolved.
Outdated

/**
* Returns an enriched {@link ConfigDef} building upon the {@code ConfigDef}, using the current configuration specified in {@code props} as an input.
*
* @param baseConfigDef the base configuration definition to be enriched
* @param props the non parsed configuration properties
* @return the enriched configuration definition
*/
public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, String> props, AbstractConfig defaultGroupConfig) {
List<Object> topicCreationGroups = new ArrayList<>();
Object aliases = ConfigDef.parseType(TOPIC_CREATION_GROUPS_CONFIG, props.get(TOPIC_CREATION_GROUPS_CONFIG), ConfigDef.Type.LIST);
if (aliases instanceof List) {
topicCreationGroups.addAll((List<?>) aliases);
}

ConfigDef newDef = new ConfigDef(baseConfigDef);
String defaultGroupPrefix = TOPIC_CREATION_PREFIX + DEFAULT_TOPIC_CREATION_GROUP + ".";
short defaultGroupReplicationFactor = defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG);
int defaultGroupPartitions = defaultGroupConfig.getInt(defaultGroupPrefix + PARTITIONS_CONFIG);
topicCreationGroups.stream().distinct().forEach(group -> {
if (!(group instanceof String)) {
throw new ConfigException("Item in " + TOPIC_CREATION_GROUPS_CONFIG + " property is not of type String");
}
String alias = (String) group;
String prefix = TOPIC_CREATION_PREFIX + alias + ".";
String configGroup = TOPIC_CREATION_GROUP + ": " + alias;
newDef.embed(prefix, configGroup, 0,
TopicCreationConfig.configDef(configGroup, defaultGroupReplicationFactor, defaultGroupPartitions));
});
return newDef;
}

public SourceConnectorConfig(Plugins plugins, Map<String, String> props, boolean createTopics) {
super(plugins, config, props);
if (createTopics && props.entrySet().stream().anyMatch(e -> e.getKey().startsWith(TOPIC_CREATION_PREFIX))) {
ConfigDef defaultConfigDef = embedDefaultGroup(config);
// This config is only used to set default values for partitions and replication
// factor from the default group and otherwise it remains unused
AbstractConfig defaultGroup = new AbstractConfig(defaultConfigDef, props, false);

// If the user has added regex of include or exclude patterns in the default group,
// they should be ignored.
Comment on lines +126 to +127
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TDD :)

Map<String, String> propsWithoutRegexForDefaultGroup = new HashMap<>(props);
propsWithoutRegexForDefaultGroup.entrySet()
.removeIf(e -> e.getKey().equals(DEFAULT_TOPIC_CREATION_PREFIX + INCLUDE_REGEX_CONFIG)
|| e.getKey().equals(DEFAULT_TOPIC_CREATION_PREFIX + EXCLUDE_REGEX_CONFIG));
enrichedSourceConfig = new EnrichedSourceConnectorConfig(enrich(defaultConfigDef, props,
defaultGroup), propsWithoutRegexForDefaultGroup);
} else {
enrichedSourceConfig = null;
}
}

@Override
public Object get(String key) {
return enrichedSourceConfig != null ? enrichedSourceConfig.get(key) : super.get(key);
}

/**
* Returns whether this configuration uses topic creation properties.
*
* @return true if the configuration should be validated and used for topic creation; false otherwise
*/
public boolean usesTopicCreation() {
return enrichedSourceConfig != null;
}

public List<String> topicCreationInclude(String group) {
return getList(TOPIC_CREATION_PREFIX + group + '.' + INCLUDE_REGEX_CONFIG);
}

public List<String> topicCreationExclude(String group) {
return getList(TOPIC_CREATION_PREFIX + group + '.' + EXCLUDE_REGEX_CONFIG);
}

public Short topicCreationReplicationFactor(String group) {
return getShort(TOPIC_CREATION_PREFIX + group + '.' + REPLICATION_FACTOR_CONFIG);
}

public Integer topicCreationPartitions(String group) {
return getInt(TOPIC_CREATION_PREFIX + group + '.' + PARTITIONS_CONFIG);
}

public Map<String, Object> topicCreationOtherConfigs(String group) {
if (enrichedSourceConfig == null) {
return Collections.emptyMap();
}
return enrichedSourceConfig.originalsWithPrefix(TOPIC_CREATION_PREFIX + group + '.').entrySet().stream()
.filter(e -> {
String key = e.getKey();
return !(INCLUDE_REGEX_CONFIG.equals(key) || EXCLUDE_REGEX_CONFIG.equals(key)
|| REPLICATION_FACTOR_CONFIG.equals(key) || PARTITIONS_CONFIG.equals(key));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public static void main(String[] args) {
Expand Down
Loading