Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ public Optional<Integer> numberOfPartitions() {

public void setNumberOfPartitions(final int numberOfPartitions) {
if (hasEnforcedNumberOfPartitions()) {
throw new UnsupportedOperationException("number of partitions are enforced on topic " +
"" + name() + " and can't be altered.");
throw new UnsupportedOperationException("number of partitions are enforced on topic " + name() + " and can't be altered.");
}

validateNumberOfPartitions(numberOfPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,9 @@ public String toString() {
*/
@Override
public String toString(final String indent) {
final StringBuilder sb = new StringBuilder(super.toString(indent));
sb.append(indent).append("\ttopic:\t\t");
sb.append(topicExtractor);
sb.append("\n");
return sb.toString();
return super.toString(indent) + indent + "\ttopic:\t\t" +
topicExtractor +
"\n";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,10 @@ public String toString() {
}

public String toString(final String indent) {
final StringBuilder builder = new StringBuilder();
builder.append(indent).append("GlobalMetadata: ").append(allMetadata).append("\n");
builder.append(indent).append("GlobalStores: ").append(globalStores).append("\n");
builder.append(indent).append("My HostInfo: ").append(thisHost).append("\n");
builder.append(indent).append("PartitionsByTopic: ").append(partitionsByTopic).append("\n");

return builder.toString();
return indent + "GlobalMetadata: " + allMetadata + "\n" +
indent + "GlobalStores: " + globalStores + "\n" +
indent + "My HostInfo: " + thisHost + "\n" +
indent + "PartitionsByTopic: " + partitionsByTopic + "\n";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class MeteredSessionStore<K, V>
private InternalProcessorContext<?, ?> context;
private TaskId taskId;

private LongAdder numOpenIterators = new LongAdder();
private final LongAdder numOpenIterators = new LongAdder();
private final NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));

@SuppressWarnings("rawtypes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public class MeteredWindowStore<K, V>
private InternalProcessorContext<?, ?> context;
private TaskId taskId;

private LongAdder numOpenIterators = new LongAdder();
private NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
private final LongAdder numOpenIterators = new LongAdder();
private final NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));

@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public String name() {
public KeyValueStore<Bytes, byte[]> get() {
return new KeyValueStore<Bytes, byte[]>() {
private boolean open = false;
private Map<Bytes, byte[]> map = new HashMap<>();
private final Map<Bytes, byte[]> map = new HashMap<>();
private Position position;
private StateStoreContext context;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -178,7 +179,7 @@ public void shouldRecoverBufferAfterShutdown(final String processingGuarantee, f
// flush those recovered buffered events out.
produceSynchronouslyToPartitionZero(
streamInput,
asList(
Collections.singletonList(
new KeyValueTimestamp<>("k6", "v6", scaledTime(20L))
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static void startCluster() throws IOException, InterruptedException {
new KeyValue<>("ID123-4", "ID123-A4")
);

final List<KeyValue<String, String>> table2 = asList(
final List<KeyValue<String, String>> table2 = Collections.singletonList(
new KeyValue<>("ID123", "BBB")
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void shouldUpgradeWithTopologyOptimizationOff() throws Exception {
"1",
"A",
currentTime + 42L,
asList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L))
singletonList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L))
);

processKeyValueAndVerifyCount(
Expand Down Expand Up @@ -200,7 +200,7 @@ public void shouldRestartWithTopologyOptimizationOn() throws Exception {
"1",
"A",
currentTime + 42L,
asList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L))
singletonList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L))
);

processKeyValueAndVerifyCount(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void testLeft(final boolean cacheEnabled) {
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Arrays.asList(
Collections.singletonList(
new TestRecord<>(null, "E-null", null, 16L)),
null
);
Expand Down Expand Up @@ -285,7 +285,7 @@ public void testLeftRepartitioned(final boolean cacheEnabled) {
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Arrays.asList(
Collections.singletonList(
new TestRecord<>(null, "E-null", null, 16L)),
null
);
Expand Down Expand Up @@ -340,9 +340,9 @@ public void testOuter(final boolean cacheEnabled) {
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Arrays.asList(
Collections.singletonList(
new TestRecord<>(null, "E-null", null, 16L)),
Arrays.asList(
Collections.singletonList(
new TestRecord<>(null, "null-e", null, 17L))
);

Expand Down Expand Up @@ -394,9 +394,9 @@ public void testOuterRepartitioned(final boolean cacheEnabled) {
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Arrays.asList(
Collections.singletonList(
new TestRecord<>(null, "E-null", null, 16L)),
Arrays.asList(
Collections.singletonList(
new TestRecord<>(null, "null-e", null, 17L))
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static void closeCluster() {
private String outputTopic2;
private final StreamsBuilder builder = new StreamsBuilder();
private final List<String> processorValueCollector = new ArrayList<>();
private static AtomicBoolean throwError = new AtomicBoolean(true);
private static final AtomicBoolean THROW_ERROR = new AtomicBoolean(true);

private Properties properties;

Expand Down Expand Up @@ -226,10 +226,10 @@ public void close() {
@Override
public void process(final Record<KIn, VIn> record) {
valueList.add(record.value().toString());
if (throwError.get()) {
if (THROW_ERROR.get()) {
throw new StreamsException(Thread.currentThread().getName());
}
throwError.set(true);
THROW_ERROR.set(true);
}
}

Expand Down Expand Up @@ -382,15 +382,15 @@ private void testReplaceThreads(final int numThreads) throws Exception {
final AtomicInteger count = new AtomicInteger();
kafkaStreams.setUncaughtExceptionHandler(exception -> {
if (count.incrementAndGet() == numThreads) {
throwError.set(false);
THROW_ERROR.set(false);
}
return REPLACE_THREAD;
});
startApplicationAndWaitUntilRunning(kafkaStreams);

produceMessages(NOW, inputTopic, "A");
TestUtils.waitForCondition(() -> count.get() == numThreads, "finished replacing threads");
TestUtils.waitForCondition(() -> throwError.get(), "finished replacing threads");
TestUtils.waitForCondition(THROW_ERROR::get, "finished replacing threads");
kafkaStreams.close();
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ public static void closeCluster() {
private String inputTopic;
private static StreamsBuilder builder;
private static Properties properties;
private static String appIdPrefix = "TaskMetadataTest_";
private static final String APP_ID_PREFIX = "TaskMetadataTest_";
private static String appId;
private AtomicBoolean process;
private AtomicBoolean commit;

@BeforeEach
public void setup(final TestInfo testInfo) {
final String testId = safeUniqueTestName(testInfo);
appId = appIdPrefix + testId;
appId = APP_ID_PREFIX + testId;
inputTopic = "input" + testId;
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void shouldCreateProcessorThatPrintsToFile() throws IOException {
try (final InputStream stream = Files.newInputStream(file.toPath())) {
final byte[] data = new byte[stream.available()];
stream.read(data);
assertThat(new String(data, StandardCharsets.UTF_8.name()), equalTo("[processor]: hi, 1\n"));
assertThat(new String(data, StandardCharsets.UTF_8), equalTo("[processor]: hi, 1\n"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1235,9 +1235,7 @@ private GraphNode getNodeByType(
return currentNode;
}
for (final GraphNode child: currentNode.children()) {
if (!visited.contains(child)) {
visited.add(child);
}
visited.add(child);
final GraphNode result = getNodeByType(child, clazz, visited);
if (result != null) {
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,7 +1351,7 @@ public void shouldSendDataToDynamicTopics() {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "topic";
final KStream<String, String> stream = builder.stream(input, stringConsumed);
stream.to((key, value, context) -> context.topic() + "-" + key + "-" + value.substring(0, 1),
stream.to((key, value, context) -> context.topic() + "-" + key + "-" + value.charAt(0),
Produced.with(Serdes.String(), Serdes.String()));
builder.stream(input + "-a-v", stringConsumed).process(processorSupplier);
builder.stream(input + "-b-v", stringConsumed).process(processorSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
Expand Down Expand Up @@ -71,8 +72,8 @@ public void testKStreamSplit() {
final TestOutputTopic<Integer, String> x3 = driver.createOutputTopic("x3", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> x5 = driver.createOutputTopic("x5", new IntegerDeserializer(), new StringDeserializer());
assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), x2.readValuesToList());
assertEquals(Arrays.asList("V3"), x3.readValuesToList());
assertEquals(Arrays.asList("V5"), x5.readValuesToList());
assertEquals(Collections.singletonList("V3"), x3.readValuesToList());
assertEquals(Collections.singletonList("V5"), x5.readValuesToList());
});
}

Expand Down Expand Up @@ -128,9 +129,9 @@ public void testResultingMap() {
final TestOutputTopic<Integer, String> x7 = driver.createOutputTopic("foo-5", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> defaultBranch = driver.createOutputTopic("foo-0", new IntegerDeserializer(), new StringDeserializer());
assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), even.readValuesToList());
assertEquals(Arrays.asList("V-1"), negative.readValuesToList());
assertEquals(Arrays.asList("V7"), x7.readValuesToList());
assertEquals(Arrays.asList("V1"), defaultBranch.readValuesToList());
assertEquals(Collections.singletonList("V-1"), negative.readValuesToList());
assertEquals(Collections.singletonList("V7"), x7.readValuesToList());
assertEquals(Collections.singletonList("V1"), defaultBranch.readValuesToList());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private void setFlushListener(final boolean sendOldValues) {

new TimestampedTupleForwarder<>(
store,
(org.apache.kafka.streams.processor.api.ProcessorContext<Object, Change<Object>>) null,
null,
flushListener,
sendOldValues
);
Expand Down Expand Up @@ -82,7 +82,7 @@ private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean s
final TimestampedTupleForwarder<String, String> forwarder =
new TimestampedTupleForwarder<>(
store,
(org.apache.kafka.streams.processor.api.ProcessorContext<String, Change<String>>) context,
context,
null,
sendOldValues
);
Expand All @@ -102,7 +102,7 @@ public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
final TimestampedTupleForwarder<String, String> forwarder =
new TimestampedTupleForwarder<>(
store,
(org.apache.kafka.streams.processor.api.ProcessorContext<String, Change<String>>) context,
context,
null,
false
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public class UnlimitedWindowTest {

private long start = 50;
private final long start = 50;
private final UnlimitedWindow window = new UnlimitedWindow(start);
private final SessionWindow sessionWindow = new SessionWindow(start, start);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@

public class WindowedStreamPartitionerTest {

private String topicName = "topic";
private final String topicName = "topic";

private IntegerSerializer intSerializer = new IntegerSerializer();
private StringSerializer stringSerializer = new StringSerializer();
private final IntegerSerializer intSerializer = new IntegerSerializer();
private final StringSerializer stringSerializer = new StringSerializer();

private List<PartitionInfo> infos = Arrays.asList(
private final List<PartitionInfo> infos = Arrays.asList(
new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]),
Expand All @@ -52,7 +52,7 @@ public class WindowedStreamPartitionerTest {
new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
);

private Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos,
private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos,
Collections.emptySet(), Collections.emptySet());

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class DefaultStateUpdaterTest {
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
private final TopologyMetadata topologyMetadata = unnamedTopology().build();
private DefaultStateUpdater stateUpdater =
private final DefaultStateUpdater stateUpdater =
new DefaultStateUpdater("test-state-updater", metrics, config, null, changelogReader, topologyMetadata, time);

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.mockito.quality.Strictness;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -363,8 +362,8 @@ public void shouldThrowTimeoutExceptionIfGetNumPartitionsHasTopicDescriptionTime
final InternalTopicManager internalTopicManager =
new InternalTopicManager(time, mockAdminClient, new StreamsConfig(config));
try {
final Set<String> topic1set = new HashSet<String>(Arrays.asList(topic1));
final Set<String> topic2set = new HashSet<String>(Arrays.asList(topic2));
final Set<String> topic1set = new HashSet<String>(Collections.singletonList(topic1));
final Set<String> topic2set = new HashSet<String>(Collections.singletonList(topic2));

internalTopicManager.getNumPartitions(topic1set, topic2set);

Expand All @@ -375,8 +374,8 @@ public void shouldThrowTimeoutExceptionIfGetNumPartitionsHasTopicDescriptionTime
mockAdminClient.timeoutNextRequest(1);

try {
final Set<String> topic1set = new HashSet<String>(Arrays.asList(topic1));
final Set<String> topic2set = new HashSet<String>(Arrays.asList(topic2));
final Set<String> topic1set = new HashSet<String>(Collections.singletonList(topic1));
final Set<String> topic2set = new HashSet<String>(Collections.singletonList(topic2));

internalTopicManager.getNumPartitions(topic1set, topic2set);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

public class MockChangelogReader implements ChangelogReader {
private final Set<TopicPartition> restoringPartitions = new HashSet<>();
private Map<TopicPartition, Long> restoredOffsets = Collections.emptyMap();
private final Map<TopicPartition, Long> restoredOffsets = Collections.emptyMap();

public boolean isPartitionRegistered(final TopicPartition partition) {
return restoringPartitions.contains(partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,7 @@ public void shouldLoadMissingFileAsEmptyPosition() {
}

public static class StateStorePositionCommit implements CommitCallback {
private File file;
private final File file;
private final OffsetCheckpoint checkpointFile;
private final Position position;

Expand Down
Loading