diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 6fa91364f66a..45f57a6e5e75 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -157,6 +157,35 @@ object VeloxConfig { .booleanConf .createWithDefault(false) + val COLUMNAR_VELOX_SSD_CHCEKPOINT_DISABLE_FILE_COW = + buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow") + .internal() + .doc("True if copy on write should be disabled.") + .booleanConf + .createWithDefault(false) + + val COLUMNAR_VELOX_SSD_CHCEKPOINT_CHECKSUM_ENABLED = + buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled") + .internal() + .doc("If true, checksum write to SSD is enabled.") + .booleanConf + .createWithDefault(false) + + val COLUMNAR_VELOX_SSD_CHCEKPOINT_CHECKSUM_READ_VERIFICATION_ENABLED = + buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled") + .internal() + .doc("If true, checksum read verification from SSD is enabled.") + .booleanConf + .createWithDefault(false) + + val COLUMNAR_VELOX_SSD_CHCEKPOINT_INTERVAL_SIZE = + buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes") + .internal() + .doc("Checkpoint after every 'checkpointIntervalBytes' for SSD cache. " + + "0 means no checkpointing.") + .intConf + .createWithDefault(0) + val COLUMNAR_VELOX_CONNECTOR_IO_THREADS = buildStaticConf("spark.gluten.sql.columnar.backend.velox.IOThreads") .internal() diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxLocalCacheSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxLocalCacheSuite.scala new file mode 100644 index 000000000000..bb50399c134d --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxLocalCacheSuite.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.config.VeloxConfig +import org.apache.gluten.execution.{BasicScanExecTransformer, VeloxWholeStageTransformerSuite} + +import org.apache.spark.SparkConf + +import java.io.File + +class VeloxLocalCacheSuite extends VeloxWholeStageTransformerSuite { + override protected val resourcePath: String = "/parquet-for-read" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set(VeloxConfig.LOAD_QUANTUM.key, "8m") + .set(VeloxConfig.COLUMNAR_VELOX_CACHE_ENABLED.key, "true") + .set(VeloxConfig.COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key, "true") + } + + testWithSpecifiedSparkVersion("read example parquet files", Some("3.5"), Some("3.5")) { + withTable("test_table") { + val dir = new File(getClass.getResource(resourcePath).getFile) + val files = dir.listFiles + if (files != null) { + files.foreach { + file => + // Exclude parquet files failed to read by velox for now + if (file.getName != "test-file-with-no-column-indexes-1.parquet") { + val df = spark.read.parquet(file.getAbsolutePath) + df.createOrReplaceTempView("test_table") + runQueryAndCompare("select * from test_table") { + checkGlutenOperatorMatch[BasicScanExecTransformer] + } + } + } + } + } + } +} diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 44313fbb39fd..03268eeefd24 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -155,7 +155,6 @@ void VeloxBackend::init( #endif initJolFilesystem(); - initCache(); initConnector(); velox::dwio::common::registerFileSinks(); @@ -194,6 +193,10 @@ void VeloxBackend::init( } LOG(INFO) << "Setting global Velox memory manager with capacity: " << memoryManagerCapacity; facebook::velox::memory::initializeMemoryManager({.allocatorCapacity = memoryManagerCapacity}); + + // local cache persistent relies on the cache pool from root memory pool so we need to init this + // after the memory manager instanced + initCache(); } facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const { @@ -210,30 +213,45 @@ void VeloxBackend::initJolFilesystem() { registerJolFileSystem(maxSpillFileSize); } +std::unique_ptr VeloxBackend::initSsdCache(uint64_t ssdCacheSize) { + FLAGS_velox_ssd_odirect = backendConf_->get(kVeloxSsdODirectEnabled, false); + int32_t ssdCacheShards = backendConf_->get(kVeloxSsdCacheShards, kVeloxSsdCacheShardsDefault); + int32_t ssdCacheIOThreads = backendConf_->get(kVeloxSsdCacheIOThreads, kVeloxSsdCacheIOThreadsDefault); + std::string ssdCachePathPrefix = backendConf_->get(kVeloxSsdCachePath, kVeloxSsdCachePathDefault); + uint64_t ssdCheckpointIntervalSize = backendConf_->get(kVeloxSsdCheckpointIntervalBytes, 0); + bool disableFileCow = backendConf_->get(kVeloxSsdDisableFileCow, false); + bool checksumEnabled = backendConf_->get(kVeloxSsdCheckSumEnabled, false); + bool checksumReadVerificationEnabled = backendConf_->get(kVeloxSsdCheckSumReadVerificationEnabled, false); + + cachePathPrefix_ = ssdCachePathPrefix; + cacheFilePrefix_ = getCacheFilePrefix(); + std::string ssdCachePath = ssdCachePathPrefix + "/" + cacheFilePrefix_; + ssdCacheExecutor_ = std::make_unique(ssdCacheIOThreads); + const cache::SsdCache::Config config( + ssdCachePath, + ssdCacheSize, + ssdCacheShards, + ssdCacheExecutor_.get(), + ssdCheckpointIntervalSize, + disableFileCow, + checksumEnabled, + checksumReadVerificationEnabled); + auto ssd = std::make_unique(config); + std::error_code ec; + const std::filesystem::space_info si = std::filesystem::space(ssdCachePathPrefix, ec); + if (si.available < ssdCacheSize) { + VELOX_FAIL( + "not enough space for ssd cache in " + ssdCachePath + " cache size: " + std::to_string(ssdCacheSize) + + "free space: " + std::to_string(si.available)); + } + LOG(INFO) << "Initializing SSD cache with: " << config.toString(); + return ssd; +} + void VeloxBackend::initCache() { if (backendConf_->get(kVeloxCacheEnabled, false)) { - FLAGS_velox_ssd_odirect = backendConf_->get(kVeloxSsdODirectEnabled, false); - uint64_t memCacheSize = backendConf_->get(kVeloxMemCacheSize, kVeloxMemCacheSizeDefault); uint64_t ssdCacheSize = backendConf_->get(kVeloxSsdCacheSize, kVeloxSsdCacheSizeDefault); - int32_t ssdCacheShards = backendConf_->get(kVeloxSsdCacheShards, kVeloxSsdCacheShardsDefault); - int32_t ssdCacheIOThreads = backendConf_->get(kVeloxSsdCacheIOThreads, kVeloxSsdCacheIOThreadsDefault); - std::string ssdCachePathPrefix = backendConf_->get(kVeloxSsdCachePath, kVeloxSsdCachePathDefault); - - cachePathPrefix_ = ssdCachePathPrefix; - cacheFilePrefix_ = getCacheFilePrefix(); - std::string ssdCachePath = ssdCachePathPrefix + "/" + cacheFilePrefix_; - ssdCacheExecutor_ = std::make_unique(ssdCacheIOThreads); - const cache::SsdCache::Config config(ssdCachePath, ssdCacheSize, ssdCacheShards, ssdCacheExecutor_.get()); - auto ssd = std::make_unique(config); - - std::error_code ec; - const std::filesystem::space_info si = std::filesystem::space(ssdCachePathPrefix, ec); - if (si.available < ssdCacheSize) { - VELOX_FAIL( - "not enough space for ssd cache in " + ssdCachePath + " cache size: " + std::to_string(ssdCacheSize) + - "free space: " + std::to_string(si.available)); - } velox::memory::MmapAllocator::Options options; options.capacity = memCacheSize; @@ -244,13 +262,12 @@ void VeloxBackend::initCache() { asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get()); } else { // TODO: this is not tracked by Spark. + auto ssd = initSsdCache(ssdCacheSize); asyncDataCache_ = velox::cache::AsyncDataCache::create(cacheAllocator_.get(), std::move(ssd)); } VELOX_CHECK_NOT_NULL(dynamic_cast(asyncDataCache_.get())); - LOG(INFO) << "STARTUP: Using AsyncDataCache memory cache size: " << memCacheSize - << ", ssdCache prefix: " << ssdCachePath << ", ssdCache size: " << ssdCacheSize - << ", ssdCache shards: " << ssdCacheShards << ", ssdCache IO threads: " << ssdCacheIOThreads; + LOG(INFO) << "AsyncDataCache is ready"; } } diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h index 86c77d450864..e52277d8612d 100644 --- a/cpp/velox/compute/VeloxBackend.h +++ b/cpp/velox/compute/VeloxBackend.h @@ -38,18 +38,7 @@ inline static const std::string kVeloxBackendKind{"velox"}; /// Should not put heavily work here. class VeloxBackend { public: - ~VeloxBackend() { - if (dynamic_cast(asyncDataCache_.get())) { - LOG(INFO) << asyncDataCache_->toString(); - for (const auto& entry : std::filesystem::directory_iterator(cachePathPrefix_)) { - if (entry.path().filename().string().find(cacheFilePrefix_) != std::string::npos) { - LOG(INFO) << "Removing cache file " << entry.path().filename().string(); - std::filesystem::remove(cachePathPrefix_ + "/" + entry.path().filename().string()); - } - } - asyncDataCache_->shutdown(); - } - } + ~VeloxBackend() {} static void create( std::unique_ptr listener, @@ -73,6 +62,18 @@ class VeloxBackend { // So, we need to destruct IOThreadPoolExecutor and stop the threads before global variables get destructed. ioExecutor_.reset(); globalMemoryManager_.reset(); + + // dump cache stats on exit if enabled + if (dynamic_cast(asyncDataCache_.get())) { + LOG(INFO) << asyncDataCache_->toString(); + for (const auto& entry : std::filesystem::directory_iterator(cachePathPrefix_)) { + if (entry.path().filename().string().find(cacheFilePrefix_) != std::string::npos) { + LOG(INFO) << "Removing cache file " << entry.path().filename().string(); + std::filesystem::remove(cachePathPrefix_ + "/" + entry.path().filename().string()); + } + } + asyncDataCache_->shutdown(); + } } private: @@ -86,6 +87,7 @@ class VeloxBackend { void initCache(); void initConnector(); void initUdf(); + std::unique_ptr initSsdCache(uint64_t ssdSize); void initJolFilesystem(); diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index cb3f89107245..3b9ef56dd3a5 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -102,6 +102,12 @@ const uint32_t kVeloxSsdCacheShardsDefault = 1; const std::string kVeloxSsdCacheIOThreads = "spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads"; const uint32_t kVeloxSsdCacheIOThreadsDefault = 1; const std::string kVeloxSsdODirectEnabled = "spark.gluten.sql.columnar.backend.velox.ssdODirect"; +const std::string kVeloxSsdCheckpointIntervalBytes = + "spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes"; +const std::string kVeloxSsdDisableFileCow = "spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow"; +const std::string kVeloxSsdCheckSumEnabled = "spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled"; +const std::string kVeloxSsdCheckSumReadVerificationEnabled = + "spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled"; // async const std::string kVeloxIOThreads = "spark.gluten.sql.columnar.backend.velox.IOThreads";