From d2fe98fa25d2603ad5ff13ddbdac9d87b69bf43b Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 4 Mar 2026 19:33:51 -0800 Subject: [PATCH 1/2] MINOR: AbstractSegments should not swallow exceptions --- .../state/internals/AbstractSegments.java | 30 ++++++++----------- .../AbstractRocksDBWindowStoreTest.java | 4 +-- .../state/internals/KeyValueSegmentsTest.java | 4 +-- .../internals/TimestampedSegmentsTest.java | 4 +-- 4 files changed, 19 insertions(+), 23 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java index 83c11bd16c244..23cc1587d3b4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java @@ -120,24 +120,20 @@ public S getOrCreateSegmentIfLive(final long segmentId, @Override public void openExisting(final StateStoreContext context, final long streamTime) { - try { - final File dir = new File(context.stateDir(), name); - if (dir.exists()) { - final String[] list = dir.list(); - if (list != null) { - Arrays.stream(list) - .map(segment -> segmentIdFromSegmentName(segment, dir)) - .sorted() // open segments in the id order - .filter(segmentId -> segmentId >= 0) - .forEach(segmentId -> getOrCreateSegment(segmentId, context)); - } - } else { - if (!dir.mkdir()) { - throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for segments %s", dir, name)); - } + final File dir = new File(context.stateDir(), name); + if (dir.exists()) { + final String[] list = dir.list(); + if (list != null) { + Arrays.stream(list) + .map(segment -> segmentIdFromSegmentName(segment, dir)) + .sorted() // open segments in the id order + .filter(segmentId -> segmentId >= 0) + .forEach(segmentId -> getOrCreateSegment(segmentId, context)); + } + } else { + if (!dir.mkdir()) { + throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for segments %s", dir, name)); } - } catch (final Exception ex) { - // ignore } cleanupExpiredSegments(streamTime); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java index 96995d047bac8..45b5b3078376c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java @@ -49,7 +49,7 @@ public abstract class AbstractRocksDBWindowStoreTest extends AbstractWindowBytesStoreTest { - private static final String STORE_NAME = "rocksDB window store"; + private static final String STORE_NAME = "rocksDB-windowstore"; private static final String METRICS_SCOPE = "test-state-id"; private final KeyValueSegments segments = @@ -602,7 +602,7 @@ public void testRestore() throws Exception { windowStore.close(); // remove local store image - Utils.delete(baseDir); + Utils.delete(new File(baseDir, STORE_NAME)); windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java index 50da7b5d9f2ca..ee0c41802bbf9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java @@ -305,7 +305,7 @@ public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exc for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + "-" + formatter.format(new Date(segmentId * segmentInterval))); //noinspection ResultOfMethodCallIgnored - Files.createFile(oldSegment.toPath()); + Files.createDirectory(oldSegment.toPath()); } segments.openExisting(context, -1L); @@ -327,7 +327,7 @@ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Ex for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1))); //noinspection ResultOfMethodCallIgnored - Files.createFile(oldSegment.toPath()); + Files.createDirectory(oldSegment.toPath()); } segments.openExisting(context, -1L); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java index ef9f5625d7a1f..f9c608dbb93a6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java @@ -306,7 +306,7 @@ public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exc for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + "-" + formatter.format(new Date(segmentId * segmentInterval))); //noinspection ResultOfMethodCallIgnored - Files.createFile(oldSegment.toPath()); + Files.createDirectory(oldSegment.toPath()); } segments.openExisting(context, -1L); @@ -328,7 +328,7 @@ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Ex for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1))); //noinspection ResultOfMethodCallIgnored - Files.createFile(oldSegment.toPath()); + Files.createDirectory(oldSegment.toPath()); } segments.openExisting(context, -1L); From e77decd8e72ee2aa75c48b8a1781264b64640a11 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 5 Mar 2026 19:33:47 -0800 Subject: [PATCH 2/2] review comments --- .../streams/state/internals/AbstractSegments.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java index 23cc1587d3b4f..d232f008de740 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java @@ -121,15 +121,13 @@ public S getOrCreateSegmentIfLive(final long segmentId, @Override public void openExisting(final StateStoreContext context, final long streamTime) { final File dir = new File(context.stateDir(), name); - if (dir.exists()) { + if (dir.exists() && dir.isDirectory()) { final String[] list = dir.list(); - if (list != null) { - Arrays.stream(list) - .map(segment -> segmentIdFromSegmentName(segment, dir)) - .sorted() // open segments in the id order - .filter(segmentId -> segmentId >= 0) - .forEach(segmentId -> getOrCreateSegment(segmentId, context)); - } + Arrays.stream(list) + .map(segment -> segmentIdFromSegmentName(segment, dir)) + .filter(segmentId -> segmentId >= 0) + .sorted() // open segments in the id order + .forEach(segmentId -> getOrCreateSegment(segmentId, context)); } else { if (!dir.mkdir()) { throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for segments %s", dir, name));