diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java index 83c11bd16c244..d232f008de740 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java @@ -120,24 +120,18 @@ public S getOrCreateSegmentIfLive(final long segmentId, @Override public void openExisting(final StateStoreContext context, final long streamTime) { - try { - final File dir = new File(context.stateDir(), name); - if (dir.exists()) { - final String[] list = dir.list(); - if (list != null) { - Arrays.stream(list) - .map(segment -> segmentIdFromSegmentName(segment, dir)) - .sorted() // open segments in the id order - .filter(segmentId -> segmentId >= 0) - .forEach(segmentId -> getOrCreateSegment(segmentId, context)); - } - } else { - if (!dir.mkdir()) { - throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for segments %s", dir, name)); - } + final File dir = new File(context.stateDir(), name); + if (dir.exists() && dir.isDirectory()) { + final String[] list = dir.list(); + Arrays.stream(list) + .map(segment -> segmentIdFromSegmentName(segment, dir)) + .filter(segmentId -> segmentId >= 0) + .sorted() // open segments in the id order + .forEach(segmentId -> getOrCreateSegment(segmentId, context)); + } else { + if (!dir.mkdir()) { + throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for segments %s", dir, name)); } - } catch (final Exception ex) { - // ignore } cleanupExpiredSegments(streamTime); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java index 96995d047bac8..45b5b3078376c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java @@ -49,7 +49,7 @@ public abstract class AbstractRocksDBWindowStoreTest extends AbstractWindowBytesStoreTest { - private static final String STORE_NAME = "rocksDB window store"; + private static final String STORE_NAME = "rocksDB-windowstore"; private static final String METRICS_SCOPE = "test-state-id"; private final KeyValueSegments segments = @@ -602,7 +602,7 @@ public void testRestore() throws Exception { windowStore.close(); // remove local store image - Utils.delete(baseDir); + Utils.delete(new File(baseDir, STORE_NAME)); windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java index 50da7b5d9f2ca..ee0c41802bbf9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java @@ -305,7 +305,7 @@ public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exc for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + "-" + formatter.format(new Date(segmentId * segmentInterval))); //noinspection ResultOfMethodCallIgnored - Files.createFile(oldSegment.toPath()); + Files.createDirectory(oldSegment.toPath()); } segments.openExisting(context, -1L); @@ -327,7 +327,7 @@ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Ex for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1))); //noinspection ResultOfMethodCallIgnored - Files.createFile(oldSegment.toPath()); + Files.createDirectory(oldSegment.toPath()); } segments.openExisting(context, -1L); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java index ef9f5625d7a1f..f9c608dbb93a6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java @@ -306,7 +306,7 @@ public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exc for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + "-" + formatter.format(new Date(segmentId * segmentInterval))); //noinspection ResultOfMethodCallIgnored - Files.createFile(oldSegment.toPath()); + Files.createDirectory(oldSegment.toPath()); } segments.openExisting(context, -1L); @@ -328,7 +328,7 @@ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Ex for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1))); //noinspection ResultOfMethodCallIgnored - Files.createFile(oldSegment.toPath()); + Files.createDirectory(oldSegment.toPath()); } segments.openExisting(context, -1L);