Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fe/check/checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,7 @@ under the License.
<suppress files="[\\/]com[\\/]amazonaws[\\/]glue[\\/]catalog[\\/]" checks="[a-zA-Z0-9]*"/>
<suppress files="[\\/]com[\\/]aliyun[\\/]datalake[\\/]metastore[\\/]hive2[\\/]" checks="[a-zA-Z0-9]*"/>
<suppress files="FSDataInputStreamWrapper\.java" checks="[a-zA-Z0-9]*"/>

<!-- ignore hudi disk map copied from hudi/common/util/collection/DiskMap.java -->
<suppress files="org[\\/]apache[\\/]hudi[\\/]common[\\/]util[\\/]collection[\\/]DiskMap\.java" checks="[a-zA-Z0-9]*"/>
</suppressions>
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.util.collection;

import org.apache.hudi.common.util.FileIOUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Stream;

/* Copied From
* https://github.com/apache/hudi/blob/release-0.15.0/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
* Doris Modification.
* Use Static cleaner class to avoid circular references in shutdown hooks
*/

/**
* This interface provides the map interface for storing records in disk after
* they
* spill over from memory. Used by {@link ExternalSpillableMap}.
*
* @param <T> The generic type of the keys
* @param <R> The generic type of the values
*/
public abstract class DiskMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> {

private static final Logger LOG = LoggerFactory.getLogger(DiskMap.class);
private static final String SUBFOLDER_PREFIX = "hudi";
private final File diskMapPathFile;
private transient Thread shutdownThread = null;

// Base path for the write file
protected final String diskMapPath;

public DiskMap(String basePath, String prefix) throws IOException {
this.diskMapPath = String.format("%s/%s-%s-%s", basePath, SUBFOLDER_PREFIX, prefix, UUID.randomUUID().toString());
diskMapPathFile = new File(diskMapPath);
FileIOUtils.deleteDirectory(diskMapPathFile);
FileIOUtils.mkdir(diskMapPathFile);
// Make sure the folder is deleted when JVM exits
diskMapPathFile.deleteOnExit();
addShutDownHook();
}

/**
* Register shutdown hook to force flush contents of the data written to
* FileOutputStream from OS page cache
* (typically 4 KB) to disk.
*/
private void addShutDownHook() {
// Register this disk map path with the static cleaner instead of using an
// instance-specific hook
DiskMapCleaner.registerForCleanup(diskMapPath);
}

/**
* @returns a stream of the values stored in the disk.
*/
abstract Stream<R> valueStream();

/**
* Number of bytes spilled to disk.
*/
abstract long sizeOfFileOnDiskInBytes();

/**
* Close and cleanup the Map.
*/
public void close() {
cleanup(false);
}

/**
* Cleanup all resources, files and folders
* triggered by shutdownhook.
*/
private void cleanup() {
cleanup(true);
}

/**
* Cleanup all resources, files and folders.
*/
private void cleanup(boolean isTriggeredFromShutdownHook) {
// Reuse the static cleaner method to clean the directory
DiskMapCleaner.cleanupDirectory(diskMapPath);

// Deregister from the static cleaner
if (!isTriggeredFromShutdownHook) {
DiskMapCleaner.deregisterFromCleanup(diskMapPath);
}
}

/**
* Static cleaner class to avoid circular references in shutdown hooks
*/
private static class DiskMapCleaner {
private static final Logger CLEANER_LOG = LoggerFactory.getLogger(DiskMapCleaner.class);
private static final Set<String> PATHS_TO_CLEAN = Collections.synchronizedSet(new HashSet<>());
private static final Thread SHUTDOWN_HOOK;

static {
// Register a single JVM-wide shutdown hook that handles all paths
SHUTDOWN_HOOK = new Thread(() -> {
synchronized (PATHS_TO_CLEAN) {
PATHS_TO_CLEAN.forEach(DiskMapCleaner::cleanupDirectory);
PATHS_TO_CLEAN.clear();
}
});
Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK);
}

/**
* Register a path to be cleaned up when JVM exits
*
* @param directoryPath Path to register for cleanup
*/
public static void registerForCleanup(String directoryPath) {
PATHS_TO_CLEAN.add(directoryPath);
}

/**
* Deregister a path from cleanup when it's manually cleaned
*
* @param directoryPath Path to deregister from cleanup
*/
public static void deregisterFromCleanup(String directoryPath) {
PATHS_TO_CLEAN.remove(directoryPath);
}

/**
* Static cleanup method that doesn't hold references to DiskMap instances
*
* @param directoryPath Path to the directory that needs to be cleaned up
*/
public static void cleanupDirectory(String directoryPath) {
try {
FileIOUtils.deleteDirectory(new File(directoryPath));
} catch (IOException exception) {
CLEANER_LOG.warn("Error while deleting the disk map directory=" + directoryPath, exception);
}
}
}
}