Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,35 @@ object VeloxConfig {
.booleanConf
.createWithDefault(false)

val COLUMNAR_VELOX_SSD_CHCEKPOINT_DISABLE_FILE_COW =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow")
.internal()
.doc("True if copy on write should be disabled.")
.booleanConf
.createWithDefault(false)

val COLUMNAR_VELOX_SSD_CHCEKPOINT_CHECKSUM_ENABLED =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled")
.internal()
.doc("If true, checksum write to SSD is enabled.")
.booleanConf
.createWithDefault(false)

val COLUMNAR_VELOX_SSD_CHCEKPOINT_CHECKSUM_READ_VERIFICATION_ENABLED =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled")
.internal()
.doc("If true, checksum read verification from SSD is enabled.")
.booleanConf
.createWithDefault(false)

val COLUMNAR_VELOX_SSD_CHCEKPOINT_INTERVAL_SIZE =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes")
.internal()
.doc("Checkpoint after every 'checkpointIntervalBytes' for SSD cache. " +
"0 means no checkpointing.")
.intConf
.createWithDefault(0)

val COLUMNAR_VELOX_CONNECTOR_IO_THREADS =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.IOThreads")
.internal()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.gluten.config.VeloxConfig
import org.apache.gluten.execution.{BasicScanExecTransformer, VeloxWholeStageTransformerSuite}

import org.apache.spark.SparkConf

import java.io.File

class VeloxLocalCacheSuite extends VeloxWholeStageTransformerSuite {
override protected val resourcePath: String = "/parquet-for-read"
override protected val fileFormat: String = "parquet"

override protected def sparkConf: SparkConf = {
super.sparkConf
.set(VeloxConfig.LOAD_QUANTUM.key, "8m")
.set(VeloxConfig.COLUMNAR_VELOX_CACHE_ENABLED.key, "true")
.set(VeloxConfig.COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key, "true")
}

testWithSpecifiedSparkVersion("read example parquet files", Some("3.5"), Some("3.5")) {
withTable("test_table") {
val dir = new File(getClass.getResource(resourcePath).getFile)
val files = dir.listFiles
if (files != null) {
files.foreach {
file =>
// Exclude parquet files failed to read by velox for now
if (file.getName != "test-file-with-no-column-indexes-1.parquet") {
val df = spark.read.parquet(file.getAbsolutePath)
df.createOrReplaceTempView("test_table")
runQueryAndCompare("select * from test_table") {
checkGlutenOperatorMatch[BasicScanExecTransformer]
}
}
}
}
}
}
}
65 changes: 41 additions & 24 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ void VeloxBackend::init(
#endif

initJolFilesystem();
initCache();
initConnector();

velox::dwio::common::registerFileSinks();
Expand Down Expand Up @@ -194,6 +193,10 @@ void VeloxBackend::init(
}
LOG(INFO) << "Setting global Velox memory manager with capacity: " << memoryManagerCapacity;
facebook::velox::memory::initializeMemoryManager({.allocatorCapacity = memoryManagerCapacity});

// local cache persistent relies on the cache pool from root memory pool so we need to init this
// after the memory manager instanced
initCache();
}

facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const {
Expand All @@ -210,30 +213,45 @@ void VeloxBackend::initJolFilesystem() {
registerJolFileSystem(maxSpillFileSize);
}

std::unique_ptr<facebook::velox::cache::SsdCache> VeloxBackend::initSsdCache(uint64_t ssdCacheSize) {
FLAGS_velox_ssd_odirect = backendConf_->get<bool>(kVeloxSsdODirectEnabled, false);
int32_t ssdCacheShards = backendConf_->get<int32_t>(kVeloxSsdCacheShards, kVeloxSsdCacheShardsDefault);
int32_t ssdCacheIOThreads = backendConf_->get<int32_t>(kVeloxSsdCacheIOThreads, kVeloxSsdCacheIOThreadsDefault);
std::string ssdCachePathPrefix = backendConf_->get<std::string>(kVeloxSsdCachePath, kVeloxSsdCachePathDefault);
uint64_t ssdCheckpointIntervalSize = backendConf_->get<uint64_t>(kVeloxSsdCheckpointIntervalBytes, 0);
bool disableFileCow = backendConf_->get<bool>(kVeloxSsdDisableFileCow, false);
bool checksumEnabled = backendConf_->get<bool>(kVeloxSsdCheckSumEnabled, false);
bool checksumReadVerificationEnabled = backendConf_->get<bool>(kVeloxSsdCheckSumReadVerificationEnabled, false);

cachePathPrefix_ = ssdCachePathPrefix;
cacheFilePrefix_ = getCacheFilePrefix();
std::string ssdCachePath = ssdCachePathPrefix + "/" + cacheFilePrefix_;
ssdCacheExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ssdCacheIOThreads);
const cache::SsdCache::Config config(
ssdCachePath,
ssdCacheSize,
ssdCacheShards,
ssdCacheExecutor_.get(),
ssdCheckpointIntervalSize,
disableFileCow,
checksumEnabled,
checksumReadVerificationEnabled);
auto ssd = std::make_unique<velox::cache::SsdCache>(config);
std::error_code ec;
const std::filesystem::space_info si = std::filesystem::space(ssdCachePathPrefix, ec);
if (si.available < ssdCacheSize) {
VELOX_FAIL(
"not enough space for ssd cache in " + ssdCachePath + " cache size: " + std::to_string(ssdCacheSize) +
"free space: " + std::to_string(si.available));
}
LOG(INFO) << "Initializing SSD cache with: " << config.toString();
return ssd;
}

void VeloxBackend::initCache() {
if (backendConf_->get<bool>(kVeloxCacheEnabled, false)) {
FLAGS_velox_ssd_odirect = backendConf_->get<bool>(kVeloxSsdODirectEnabled, false);

uint64_t memCacheSize = backendConf_->get<uint64_t>(kVeloxMemCacheSize, kVeloxMemCacheSizeDefault);
uint64_t ssdCacheSize = backendConf_->get<uint64_t>(kVeloxSsdCacheSize, kVeloxSsdCacheSizeDefault);
int32_t ssdCacheShards = backendConf_->get<int32_t>(kVeloxSsdCacheShards, kVeloxSsdCacheShardsDefault);
int32_t ssdCacheIOThreads = backendConf_->get<int32_t>(kVeloxSsdCacheIOThreads, kVeloxSsdCacheIOThreadsDefault);
std::string ssdCachePathPrefix = backendConf_->get<std::string>(kVeloxSsdCachePath, kVeloxSsdCachePathDefault);

cachePathPrefix_ = ssdCachePathPrefix;
cacheFilePrefix_ = getCacheFilePrefix();
std::string ssdCachePath = ssdCachePathPrefix + "/" + cacheFilePrefix_;
ssdCacheExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ssdCacheIOThreads);
const cache::SsdCache::Config config(ssdCachePath, ssdCacheSize, ssdCacheShards, ssdCacheExecutor_.get());
auto ssd = std::make_unique<velox::cache::SsdCache>(config);

std::error_code ec;
const std::filesystem::space_info si = std::filesystem::space(ssdCachePathPrefix, ec);
if (si.available < ssdCacheSize) {
VELOX_FAIL(
"not enough space for ssd cache in " + ssdCachePath + " cache size: " + std::to_string(ssdCacheSize) +
"free space: " + std::to_string(si.available));
}

velox::memory::MmapAllocator::Options options;
options.capacity = memCacheSize;
Expand All @@ -244,13 +262,12 @@ void VeloxBackend::initCache() {
asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get());
} else {
// TODO: this is not tracked by Spark.
auto ssd = initSsdCache(ssdCacheSize);
asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get(), std::move(ssd));
}

VELOX_CHECK_NOT_NULL(dynamic_cast<velox::cache::AsyncDataCache*>(asyncDataCache_.get()));
LOG(INFO) << "STARTUP: Using AsyncDataCache memory cache size: " << memCacheSize
<< ", ssdCache prefix: " << ssdCachePath << ", ssdCache size: " << ssdCacheSize
<< ", ssdCache shards: " << ssdCacheShards << ", ssdCache IO threads: " << ssdCacheIOThreads;
LOG(INFO) << "AsyncDataCache is ready";
}
}

Expand Down
26 changes: 14 additions & 12 deletions cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,7 @@ inline static const std::string kVeloxBackendKind{"velox"};
/// Should not put heavily work here.
class VeloxBackend {
public:
~VeloxBackend() {
if (dynamic_cast<facebook::velox::cache::AsyncDataCache*>(asyncDataCache_.get())) {
LOG(INFO) << asyncDataCache_->toString();
for (const auto& entry : std::filesystem::directory_iterator(cachePathPrefix_)) {
if (entry.path().filename().string().find(cacheFilePrefix_) != std::string::npos) {
LOG(INFO) << "Removing cache file " << entry.path().filename().string();
std::filesystem::remove(cachePathPrefix_ + "/" + entry.path().filename().string());
}
}
asyncDataCache_->shutdown();
}
}
~VeloxBackend() {}

static void create(
std::unique_ptr<AllocationListener> listener,
Expand All @@ -73,6 +62,18 @@ class VeloxBackend {
// So, we need to destruct IOThreadPoolExecutor and stop the threads before global variables get destructed.
ioExecutor_.reset();
globalMemoryManager_.reset();

// dump cache stats on exit if enabled
if (dynamic_cast<facebook::velox::cache::AsyncDataCache*>(asyncDataCache_.get())) {
LOG(INFO) << asyncDataCache_->toString();
for (const auto& entry : std::filesystem::directory_iterator(cachePathPrefix_)) {
if (entry.path().filename().string().find(cacheFilePrefix_) != std::string::npos) {
LOG(INFO) << "Removing cache file " << entry.path().filename().string();
std::filesystem::remove(cachePathPrefix_ + "/" + entry.path().filename().string());
}
}
asyncDataCache_->shutdown();
}
}

private:
Expand All @@ -86,6 +87,7 @@ class VeloxBackend {
void initCache();
void initConnector();
void initUdf();
std::unique_ptr<facebook::velox::cache::SsdCache> initSsdCache(uint64_t ssdSize);

void initJolFilesystem();

Expand Down
6 changes: 6 additions & 0 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ const uint32_t kVeloxSsdCacheShardsDefault = 1;
const std::string kVeloxSsdCacheIOThreads = "spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads";
const uint32_t kVeloxSsdCacheIOThreadsDefault = 1;
const std::string kVeloxSsdODirectEnabled = "spark.gluten.sql.columnar.backend.velox.ssdODirect";
const std::string kVeloxSsdCheckpointIntervalBytes =
"spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes";
const std::string kVeloxSsdDisableFileCow = "spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow";
const std::string kVeloxSsdCheckSumEnabled = "spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled";
const std::string kVeloxSsdCheckSumReadVerificationEnabled =
"spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled";

// async
const std::string kVeloxIOThreads = "spark.gluten.sql.columnar.backend.velox.IOThreads";
Expand Down