Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,24 +120,18 @@ public S getOrCreateSegmentIfLive(final long segmentId,

@Override
public void openExisting(final StateStoreContext context, final long streamTime) {
try {
final File dir = new File(context.stateDir(), name);
if (dir.exists()) {
final String[] list = dir.list();
if (list != null) {
Arrays.stream(list)
.map(segment -> segmentIdFromSegmentName(segment, dir))
.sorted() // open segments in the id order
.filter(segmentId -> segmentId >= 0)
.forEach(segmentId -> getOrCreateSegment(segmentId, context));
}
} else {
if (!dir.mkdir()) {
throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for segments %s", dir, name));
}
final File dir = new File(context.stateDir(), name);
if (dir.exists() && dir.isDirectory()) {
final String[] list = dir.list();
Arrays.stream(list)
.map(segment -> segmentIdFromSegmentName(segment, dir))
.filter(segmentId -> segmentId >= 0)
.sorted() // open segments in the id order
.forEach(segmentId -> getOrCreateSegment(segmentId, context));
} else {
if (!dir.mkdir()) {
throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for segments %s", dir, name));
}
} catch (final Exception ex) {
// ignore
}

cleanupExpiredSegments(streamTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

public abstract class AbstractRocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {

private static final String STORE_NAME = "rocksDB window store";
private static final String STORE_NAME = "rocksDB-windowstore";
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related -- but avoiding spaces sound like a good idea

private static final String METRICS_SCOPE = "test-state-id";

private final KeyValueSegments segments =
Expand Down Expand Up @@ -602,7 +602,7 @@ public void testRestore() throws Exception {
windowStore.close();

// remove local store image
Utils.delete(baseDir);
Utils.delete(new File(baseDir, STORE_NAME));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We delete a directory "too high" in the hierarchy, and thus when we later try to create a segment we cannot, because the parent directory does not exist.


windowStore = buildWindowStore(RETENTION_PERIOD,
WINDOW_SIZE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exc
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + "-" + formatter.format(new Date(segmentId * segmentInterval)));
//noinspection ResultOfMethodCallIgnored
Files.createFile(oldSegment.toPath());
Files.createDirectory(oldSegment.toPath());
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Segments are directories -- if we create it here as a file, we later fail trying to create it as directory (create dir passed if dir already exists)

}

segments.openExisting(context, -1L);
Expand All @@ -327,7 +327,7 @@ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Ex
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1)));
//noinspection ResultOfMethodCallIgnored
Files.createFile(oldSegment.toPath());
Files.createDirectory(oldSegment.toPath());
}

segments.openExisting(context, -1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exc
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + "-" + formatter.format(new Date(segmentId * segmentInterval)));
//noinspection ResultOfMethodCallIgnored
Files.createFile(oldSegment.toPath());
Files.createDirectory(oldSegment.toPath());
}

segments.openExisting(context, -1L);
Expand All @@ -328,7 +328,7 @@ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Ex
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1)));
//noinspection ResultOfMethodCallIgnored
Files.createFile(oldSegment.toPath());
Files.createDirectory(oldSegment.toPath());
}

segments.openExisting(context, -1L);
Expand Down