From e1c13c2b05ab94a3d5679eced56550543279051d Mon Sep 17 00:00:00 2001 From: Siyuan Date: Wed, 11 Mar 2020 21:01:08 -0700 Subject: [PATCH 1/2] fix bug & improve code --- cpp/src/plasma/store.cc | 144 ++++---- cpp/src/plasma/test/client_tests.cc | 387 ++++++++++---------- cpp/src/plasma/test/external_store_tests.cc | 18 +- 3 files changed, 270 insertions(+), 279 deletions(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 977ba9fb260..9d7f2f1f35c 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -53,7 +53,6 @@ #include #include "arrow/status.h" - #include "plasma/common.h" #include "plasma/common_generated.h" #include "plasma/fling.h" @@ -110,8 +109,8 @@ GetRequest::GetRequest(Client* client, const std::vector& object_ids) Client::Client(int fd) : fd(fd), notification_fd(-1) {} -PlasmaStore::PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled, - const std::string& socket_name, +PlasmaStore::PlasmaStore(EventLoop* loop, const std::string& directory, + bool hugepages_enabled, const std::string& socket_name, std::shared_ptr external_store) : loop_(loop), eviction_policy_(&store_info_, PlasmaAllocator::GetFootprintLimit()), @@ -1142,8 +1141,8 @@ class PlasmaStoreRunner { public: PlasmaStoreRunner() {} - void Start(char* socket_name, std::string directory, bool hugepages_enabled, - std::shared_ptr external_store) { + void Start(const std::string& socket_name, std::string directory, + bool hugepages_enabled, std::shared_ptr external_store) { // Create the event loop. loop_.reset(new EventLoop); store_.reset(new PlasmaStore(loop_.get(), directory, hugepages_enabled, socket_name, @@ -1154,15 +1153,14 @@ class PlasmaStoreRunner { // large amount of space up front. According to the documentation, // dlmalloc might need up to 128*sizeof(size_t) bytes for internal // bookkeeping. - void* pointer = plasma::PlasmaAllocator::Memalign( - kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t)); + auto prealloc_memory = PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t); + void* pointer = PlasmaAllocator::Memalign(kBlockSize, prealloc_memory); ARROW_CHECK(pointer != nullptr); // This will unmap the file, but the next one created will be as large // as this one (this is an implementation detail of dlmalloc). - plasma::PlasmaAllocator::Free( - pointer, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t)); + PlasmaAllocator::Free(pointer, prealloc_memory); - int socket = BindIpcSock(socket_name, true); + int socket = BindIpcSock(socket_name.c_str(), true); // TODO(pcm): Check return value. ARROW_CHECK(socket >= 0); @@ -1196,8 +1194,65 @@ void HandleSignal(int signal) { } } -void StartServer(char* socket_name, std::string plasma_directory, bool hugepages_enabled, - std::shared_ptr external_store) { +void StartServer(std::string socket_name, int64_t system_memory, + std::string plasma_directory, bool hugepages_enabled, + std::string external_store_endpoint) { + if (plasma_directory.empty()) { +#ifdef __linux__ + plasma_directory = "/dev/shm"; +#else + plasma_directory = "/tmp"; +#endif + } + ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory + << " and huge page support " + << (hugepages_enabled ? "enabled" : "disabled"); +#ifdef __linux__ + if (!hugepages_enabled) { + // On Linux, check that the amount of memory available in /dev/shm is large + // enough to accommodate the request. If it isn't, then fail. + int shm_fd = open(plasma_directory.c_str(), O_RDONLY); + struct statvfs shm_vfs_stats; + fstatvfs(shm_fd, &shm_vfs_stats); + // The value shm_vfs_stats.f_bsize is the block size, and the value + // shm_vfs_stats.f_bavail is the number of available blocks. + int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail; + close(shm_fd); + // Keep some safety margin for allocator fragmentation. + shm_mem_avail = 9 * shm_mem_avail / 10; + if (system_memory > shm_mem_avail) { + ARROW_LOG(WARNING) + << "System memory request exceeds memory available in " << plasma_directory + << ". The request is for " << system_memory + << " bytes, and the amount available is " << shm_mem_avail + << " bytes. You may be able to free up space by deleting files in " + "/dev/shm. If you are inside a Docker container, you may need to " + "pass an argument with the flag '--shm-size' to 'docker run'."; + system_memory = shm_mem_avail; + } + } else { + SetMallocGranularity(1024 * 1024 * 1024); // 1 GB + } +#endif + // Set system memory capacity + PlasmaAllocator::SetFootprintLimit(static_cast(system_memory)); + ARROW_LOG(INFO) << "Allowing the Plasma store to use up to " + << static_cast(system_memory) / 1000000000 << "GB of memory."; + + // Get external store + std::shared_ptr external_store{nullptr}; + if (!external_store_endpoint.empty()) { + std::string name; + ARROW_CHECK_OK(ExternalStores::ExtractStoreName(external_store_endpoint, &name)); + external_store = ExternalStores::GetStore(name); + if (external_store == nullptr) { + ARROW_LOG(FATAL) << "No such external store \"" << name << "\""; + return -1; + } + ARROW_LOG(DEBUG) << "connecting to external store..."; + ARROW_CHECK_OK(external_store->Connect(external_store_endpoint)); + } + // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write // to a client that has already died, the store could die. signal(SIGPIPE, SIG_IGN); @@ -1212,7 +1267,7 @@ void StartServer(char* socket_name, std::string plasma_directory, bool hugepages int main(int argc, char* argv[]) { ArrowLog::StartArrowLog(argv[0], ArrowLogLevel::ARROW_INFO); ArrowLog::InstallFailureSignalHandler(); - char* socket_name = nullptr; + std::string socket_name; // Directory where plasma memory mapped files are stored. std::string plasma_directory; std::string external_store_endpoint; @@ -1231,17 +1286,12 @@ int main(int argc, char* argv[]) { hugepages_enabled = true; break; case 's': - socket_name = optarg; + socket_name = std::string(optarg); break; case 'm': { char extra; int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra); ARROW_CHECK(scanned == 1); - // Set system memory capacity - plasma::PlasmaAllocator::SetFootprintLimit(static_cast(system_memory)); - ARROW_LOG(INFO) << "Allowing the Plasma store to use up to " - << static_cast(system_memory) / 1000000000 - << "GB of memory."; break; } default: @@ -1249,7 +1299,7 @@ int main(int argc, char* argv[]) { } } // Sanity check command line options. - if (!socket_name) { + if (socket_name.empty()) { ARROW_LOG(FATAL) << "please specify socket for incoming connections with -s switch"; } if (system_memory == -1) { @@ -1259,59 +1309,9 @@ int main(int argc, char* argv[]) { ARROW_LOG(FATAL) << "if you want to use hugepages, please specify path to huge pages " "filesystem with -d"; } - if (plasma_directory.empty()) { -#ifdef __linux__ - plasma_directory = "/dev/shm"; -#else - plasma_directory = "/tmp"; -#endif - } - ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory - << " and huge page support " - << (hugepages_enabled ? "enabled" : "disabled"); -#ifdef __linux__ - if (!hugepages_enabled) { - // On Linux, check that the amount of memory available in /dev/shm is large - // enough to accommodate the request. If it isn't, then fail. - int shm_fd = open(plasma_directory.c_str(), O_RDONLY); - struct statvfs shm_vfs_stats; - fstatvfs(shm_fd, &shm_vfs_stats); - // The value shm_vfs_stats.f_bsize is the block size, and the value - // shm_vfs_stats.f_bavail is the number of available blocks. - int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail; - close(shm_fd); - // Keep some safety margin for allocator fragmentation. - shm_mem_avail = 9 * shm_mem_avail / 10; - if (system_memory > shm_mem_avail) { - ARROW_LOG(WARNING) - << "System memory request exceeds memory available in " << plasma_directory - << ". The request is for " << system_memory - << " bytes, and the amount available is " << shm_mem_avail - << " bytes. You may be able to free up space by deleting files in " - "/dev/shm. If you are inside a Docker container, you may need to " - "pass an argument with the flag '--shm-size' to 'docker run'."; - system_memory = shm_mem_avail; - } - } else { - plasma::SetMallocGranularity(1024 * 1024 * 1024); // 1 GB - } -#endif - // Get external store - std::shared_ptr external_store{nullptr}; - if (!external_store_endpoint.empty()) { - std::string name; - ARROW_CHECK_OK( - plasma::ExternalStores::ExtractStoreName(external_store_endpoint, &name)); - external_store = plasma::ExternalStores::GetStore(name); - if (external_store == nullptr) { - ARROW_LOG(FATAL) << "No such external store \"" << name << "\""; - return -1; - } - ARROW_LOG(DEBUG) << "connecting to external store..."; - ARROW_CHECK_OK(external_store->Connect(external_store_endpoint)); - } ARROW_LOG(DEBUG) << "starting server listening on " << socket_name; - plasma::StartServer(socket_name, plasma_directory, hugepages_enabled, external_store); + plasma::StartServer(socket_name, plasma_directory, hugepages_enabled, + external_store_endpoint); plasma::g_runner->Shutdown(); plasma::g_runner = nullptr; diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index a3cd8e4f272..425b15f4372 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include #include @@ -25,11 +26,8 @@ #include #include -#include - #include "arrow/testing/gtest_util.h" #include "arrow/util/io_util.h" - #include "plasma/client.h" #include "plasma/common.h" #include "plasma/plasma.h" @@ -64,13 +62,13 @@ class TestPlasmaStore : public ::testing::Test { plasma_directory + "/plasma-store-server -m 10000000 -s " + store_socket_name_ + " 1> /dev/null 2> /dev/null & " + "echo $! > " + store_socket_name_ + ".pid"; PLASMA_CHECK_SYSTEM(system(plasma_command.c_str())); - ARROW_CHECK_OK(client_.Connect(store_socket_name_, "")); - ARROW_CHECK_OK(client2_.Connect(store_socket_name_, "")); + ASSERT_OK(client_.Connect(store_socket_name_, "")); + ASSERT_OK(client2_.Connect(store_socket_name_, "")); } virtual void TearDown() { - ARROW_CHECK_OK(client_.Disconnect()); - ARROW_CHECK_OK(client2_.Disconnect()); + ASSERT_OK(client_.Disconnect()); + ASSERT_OK(client2_.Disconnect()); // Kill plasma_store process that we started #ifdef COVERAGE_BUILD // Ask plasma_store to exit gracefully and give it time to write out @@ -89,14 +87,14 @@ class TestPlasmaStore : public ::testing::Test { const std::vector& metadata, const std::vector& data, bool release = true) { std::shared_ptr data_buffer; - ARROW_CHECK_OK(client.Create(object_id, data.size(), metadata.data(), metadata.size(), - &data_buffer)); + ASSERT_OK(client.Create(object_id, data.size(), metadata.data(), metadata.size(), + &data_buffer)); for (size_t i = 0; i < data.size(); i++) { data_buffer->mutable_data()[i] = data[i]; } - ARROW_CHECK_OK(client.Seal(object_id)); + ASSERT_OK(client.Seal(object_id)); if (release) { - ARROW_CHECK_OK(client.Release(object_id)); + ASSERT_OK(client.Release(object_id)); } } @@ -110,8 +108,8 @@ class TestPlasmaStore : public ::testing::Test { TEST_F(TestPlasmaStore, NewSubscriberTest) { PlasmaClient local_client, local_client2; - ARROW_CHECK_OK(local_client.Connect(store_socket_name_, "")); - ARROW_CHECK_OK(local_client2.Connect(store_socket_name_, "")); + ASSERT_OK(local_client.Connect(store_socket_name_, "")); + ASSERT_OK(local_client2.Connect(store_socket_name_, "")); ObjectID object_id = random_object_id(); @@ -121,46 +119,43 @@ TEST_F(TestPlasmaStore, NewSubscriberTest) { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK( - local_client.Create(object_id, data_size, metadata, metadata_size, &data)); - ARROW_CHECK_OK(local_client.Seal(object_id)); + ASSERT_OK(local_client.Create(object_id, data_size, metadata, metadata_size, &data)); + ASSERT_OK(local_client.Seal(object_id)); // Test that new subscriber client2 can receive notifications about existing objects. int fd = -1; - ARROW_CHECK_OK(local_client2.Subscribe(&fd)); + ASSERT_OK(local_client2.Subscribe(&fd)); ASSERT_GT(fd, 0); ObjectID object_id2 = random_object_id(); int64_t data_size2 = 0; int64_t metadata_size2 = 0; - ARROW_CHECK_OK( - local_client2.GetNotification(fd, &object_id2, &data_size2, &metadata_size2)); + ASSERT_OK(local_client2.GetNotification(fd, &object_id2, &data_size2, &metadata_size2)); ASSERT_EQ(object_id, object_id2); ASSERT_EQ(data_size, data_size2); ASSERT_EQ(metadata_size, metadata_size2); // Delete the object. - ARROW_CHECK_OK(local_client.Release(object_id)); - ARROW_CHECK_OK(local_client.Delete(object_id)); + ASSERT_OK(local_client.Release(object_id)); + ASSERT_OK(local_client.Delete(object_id)); - ARROW_CHECK_OK( - local_client2.GetNotification(fd, &object_id2, &data_size2, &metadata_size2)); + ASSERT_OK(local_client2.GetNotification(fd, &object_id2, &data_size2, &metadata_size2)); ASSERT_EQ(object_id, object_id2); ASSERT_EQ(-1, data_size2); ASSERT_EQ(-1, metadata_size2); - ARROW_CHECK_OK(local_client2.Disconnect()); - ARROW_CHECK_OK(local_client.Disconnect()); + ASSERT_OK(local_client2.Disconnect()); + ASSERT_OK(local_client.Disconnect()); } TEST_F(TestPlasmaStore, BatchNotificationTest) { PlasmaClient local_client, local_client2; - ARROW_CHECK_OK(local_client.Connect(store_socket_name_, "")); - ARROW_CHECK_OK(local_client2.Connect(store_socket_name_, "")); + ASSERT_OK(local_client.Connect(store_socket_name_, "")); + ASSERT_OK(local_client2.Connect(store_socket_name_, "")); int fd = -1; - ARROW_CHECK_OK(local_client2.Subscribe(&fd)); + ASSERT_OK(local_client2.Subscribe(&fd)); ASSERT_GT(fd, 0); ObjectID object_id1 = random_object_id(); @@ -170,25 +165,23 @@ TEST_F(TestPlasmaStore, BatchNotificationTest) { std::vector data = {"hello", "world!"}; std::vector metadata = {"1", "23"}; - ARROW_CHECK_OK(local_client.CreateAndSealBatch(object_ids, data, metadata)); + ASSERT_OK(local_client.CreateAndSealBatch(object_ids, data, metadata)); ObjectID object_id = random_object_id(); int64_t data_size = 0; int64_t metadata_size = 0; - ARROW_CHECK_OK( - local_client2.GetNotification(fd, &object_id, &data_size, &metadata_size)); + ASSERT_OK(local_client2.GetNotification(fd, &object_id, &data_size, &metadata_size)); ASSERT_EQ(object_id, object_id1); ASSERT_EQ(data_size, 5); ASSERT_EQ(metadata_size, 1); - ARROW_CHECK_OK( - local_client2.GetNotification(fd, &object_id, &data_size, &metadata_size)); + ASSERT_OK(local_client2.GetNotification(fd, &object_id, &data_size, &metadata_size)); ASSERT_EQ(object_id, object_id2); ASSERT_EQ(data_size, 6); ASSERT_EQ(metadata_size, 2); - ARROW_CHECK_OK(local_client2.Disconnect()); - ARROW_CHECK_OK(local_client.Disconnect()); + ASSERT_OK(local_client2.Disconnect()); + ASSERT_OK(local_client.Disconnect()); } TEST_F(TestPlasmaStore, SealErrorsTest) { @@ -204,7 +197,7 @@ TEST_F(TestPlasmaStore, SealErrorsTest) { // Trying to seal it again. result = client_.Seal(object_id); ASSERT_TRUE(IsPlasmaObjectAlreadySealed(result)); - ARROW_CHECK_OK(client_.Release(object_id)); + ASSERT_OK(client_.Release(object_id)); } TEST_F(TestPlasmaStore, SetQuotaBasicTest) { @@ -212,19 +205,19 @@ TEST_F(TestPlasmaStore, SetQuotaBasicTest) { ObjectID id1 = random_object_id(); ObjectID id2 = random_object_id(); - ARROW_CHECK_OK(client_.SetClientOptions("client1", 5 * 1024 * 1024)); + ASSERT_OK(client_.SetClientOptions("client1", 5 * 1024 * 1024)); std::vector big_data(3 * 1024 * 1024, 0); // First object fits CreateObject(client_, id1, {42}, big_data, true); - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_OK(client_.Contains(id1, &has_object)); ASSERT_TRUE(has_object); // Evicts first object CreateObject(client_, id2, {42}, big_data, true); - ARROW_CHECK_OK(client_.Contains(id2, &has_object)); + ASSERT_OK(client_.Contains(id2, &has_object)); ASSERT_TRUE(has_object); - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_OK(client_.Contains(id1, &has_object)); ASSERT_FALSE(has_object); // Too big to fit in quota at all @@ -244,7 +237,7 @@ TEST_F(TestPlasmaStore, SetQuotaProvidesIsolationFromOtherClients) { // First object, created without quota CreateObject(client_, id1, {42}, big_data, true); - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_OK(client_.Contains(id1, &has_object)); ASSERT_TRUE(has_object); // Second client creates a bunch of objects @@ -253,13 +246,13 @@ TEST_F(TestPlasmaStore, SetQuotaProvidesIsolationFromOtherClients) { } // First client's object is evicted - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_OK(client_.Contains(id1, &has_object)); ASSERT_FALSE(has_object); // Try again with quota enabled - ARROW_CHECK_OK(client_.SetClientOptions("client1", 5 * 1024 * 1024)); + ASSERT_OK(client_.SetClientOptions("client1", 5 * 1024 * 1024)); CreateObject(client_, id2, {42}, big_data, true); - ARROW_CHECK_OK(client_.Contains(id2, &has_object)); + ASSERT_OK(client_.Contains(id2, &has_object)); ASSERT_TRUE(has_object); // Second client creates a bunch of objects @@ -268,7 +261,7 @@ TEST_F(TestPlasmaStore, SetQuotaProvidesIsolationFromOtherClients) { } // First client's object is not evicted - ARROW_CHECK_OK(client_.Contains(id2, &has_object)); + ASSERT_OK(client_.Contains(id2, &has_object)); ASSERT_TRUE(has_object); } @@ -280,17 +273,17 @@ TEST_F(TestPlasmaStore, SetQuotaProtectsOtherClients) { // First client has no quota CreateObject(client_, id1, {42}, big_data, true); - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_OK(client_.Contains(id1, &has_object)); ASSERT_TRUE(has_object); // Second client creates a bunch of objects under a quota - ARROW_CHECK_OK(client2_.SetClientOptions("client2", 5 * 1024 * 1024)); + ASSERT_OK(client2_.SetClientOptions("client2", 5 * 1024 * 1024)); for (int i = 0; i < 10; i++) { CreateObject(client2_, random_object_id(), {42}, big_data, true); } // First client's object is NOT evicted - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_OK(client_.Contains(id1, &has_object)); ASSERT_TRUE(has_object); } @@ -315,30 +308,30 @@ TEST_F(TestPlasmaStore, SetQuotaDemotesPinnedObjectsToGlobalLRU) { // Quota is not enough to fit both id1 and id2, but global LRU is CreateObject(client_, id1, {42}, big_data, false); CreateObject(client_, id2, {42}, big_data, false); - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_OK(client_.Contains(id1, &has_object)); ASSERT_TRUE(has_object); - ARROW_CHECK_OK(client_.Contains(id2, &has_object)); + ASSERT_OK(client_.Contains(id2, &has_object)); ASSERT_TRUE(has_object); // Release both objects. Now id1 is in global LRU and id2 is in quota - ARROW_CHECK_OK(client_.Release(id1)); - ARROW_CHECK_OK(client_.Release(id2)); + ASSERT_OK(client_.Release(id1)); + ASSERT_OK(client_.Release(id2)); // This flushes id1 from the object store for (int i = 0; i < 10; i++) { CreateObject(client2_, random_object_id(), {42}, big_data, true); } - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_OK(client_.Contains(id1, &has_object)); ASSERT_FALSE(has_object); - ARROW_CHECK_OK(client_.Contains(id2, &has_object)); + ASSERT_OK(client_.Contains(id2, &has_object)); ASSERT_TRUE(has_object); } TEST_F(TestPlasmaStore, SetQuotaDemoteDisconnectToGlobalLRU) { bool has_object = false; PlasmaClient local_client; - ARROW_CHECK_OK(local_client.Connect(store_socket_name_, "")); - ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024)); + ASSERT_OK(local_client.Connect(store_socket_name_, "")); + ASSERT_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024)); ObjectID id1 = random_object_id(); std::vector big_data(3 * 1024 * 1024, 0); @@ -348,26 +341,26 @@ TEST_F(TestPlasmaStore, SetQuotaDemoteDisconnectToGlobalLRU) { for (int i = 0; i < 10; i++) { CreateObject(client_, random_object_id(), {42}, big_data, true); } - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_OK(client_.Contains(id1, &has_object)); ASSERT_TRUE(has_object); // Object is still present after disconnect - ARROW_CHECK_OK(local_client.Disconnect()); - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_OK(local_client.Disconnect()); + ASSERT_OK(client_.Contains(id1, &has_object)); ASSERT_TRUE(has_object); // But is eligible for global LRU for (int i = 0; i < 10; i++) { CreateObject(client_, random_object_id(), {42}, big_data, true); } - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_OK(client_.Contains(id1, &has_object)); ASSERT_FALSE(has_object); } TEST_F(TestPlasmaStore, SetQuotaCleanupObjectMetadata) { PlasmaClient local_client; - ARROW_CHECK_OK(local_client.Connect(store_socket_name_, "")); - ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024)); + ASSERT_OK(local_client.Connect(store_socket_name_, "")); + ASSERT_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024)); ObjectID id0 = random_object_id(); ObjectID id1 = random_object_id(); @@ -391,15 +384,15 @@ TEST_F(TestPlasmaStore, SetQuotaCleanupObjectMetadata) { ASSERT_TRUE(client_.DebugString().find("(local) num objects: 2") != std::string::npos); // release id0 - ARROW_CHECK_OK(local_client.Release(id0)); + ASSERT_OK(local_client.Release(id0)); ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 1") != std::string::npos); // delete everything - ARROW_CHECK_OK(local_client.Delete(id0)); - ARROW_CHECK_OK(local_client.Delete(id2)); - ARROW_CHECK_OK(local_client.Delete(id3)); - ARROW_CHECK_OK(local_client.Release(id3)); + ASSERT_OK(local_client.Delete(id0)); + ASSERT_OK(local_client.Delete(id2)); + ASSERT_OK(local_client.Delete(id3)); + ASSERT_OK(local_client.Release(id3)); ASSERT_TRUE(client_.DebugString().find("quota map size: 0") != std::string::npos); ASSERT_TRUE(client_.DebugString().find("pinned quota map size: 0") != std::string::npos); @@ -407,7 +400,7 @@ TEST_F(TestPlasmaStore, SetQuotaCleanupObjectMetadata) { std::string::npos); ASSERT_TRUE(client_.DebugString().find("(local) num objects: 0") != std::string::npos); - ARROW_CHECK_OK(local_client.Disconnect()); + ASSERT_OK(local_client.Disconnect()); int tries = 10; // wait for disconnect to complete while (tries > 0 && client_.DebugString().find("num clients with quota: 0") == std::string::npos) { @@ -423,8 +416,8 @@ TEST_F(TestPlasmaStore, SetQuotaCleanupObjectMetadata) { TEST_F(TestPlasmaStore, SetQuotaCleanupClientDisconnect) { PlasmaClient local_client; - ARROW_CHECK_OK(local_client.Connect(store_socket_name_, "")); - ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024)); + ASSERT_OK(local_client.Connect(store_socket_name_, "")); + ASSERT_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024)); ObjectID id1 = random_object_id(); ObjectID id2 = random_object_id(); @@ -435,7 +428,7 @@ TEST_F(TestPlasmaStore, SetQuotaCleanupClientDisconnect) { CreateObject(local_client, id2, {42}, big_data, true); CreateObject(local_client, id3, {42}, small_data, false); - ARROW_CHECK_OK(local_client.Disconnect()); + ASSERT_OK(local_client.Disconnect()); int tries = 10; // wait for disconnect to complete while (tries > 0 && client_.DebugString().find("num clients with quota: 0") == std::string::npos) { @@ -468,7 +461,7 @@ TEST_F(TestPlasmaStore, RefreshLRUTest) { // we can fit ten small objects into the store for (const auto& object_id : object_ids) { CreateObject(client_, object_id, {}, small_data, true); - ARROW_CHECK_OK(client_.Contains(object_ids[0], &has_object)); + ASSERT_OK(client_.Contains(object_ids[0], &has_object)); ASSERT_TRUE(has_object); } @@ -476,13 +469,13 @@ TEST_F(TestPlasmaStore, RefreshLRUTest) { CreateObject(client_, id, {}, small_data, true); // the first two objects got evicted (20% of the store) - ARROW_CHECK_OK(client_.Contains(object_ids[0], &has_object)); + ASSERT_OK(client_.Contains(object_ids[0], &has_object)); ASSERT_FALSE(has_object); - ARROW_CHECK_OK(client_.Contains(object_ids[1], &has_object)); + ASSERT_OK(client_.Contains(object_ids[1], &has_object)); ASSERT_FALSE(has_object); - ARROW_CHECK_OK(client_.Refresh({object_ids[2], object_ids[3]})); + ASSERT_OK(client_.Refresh({object_ids[2], object_ids[3]})); id = random_object_id(); CreateObject(client_, id, {}, small_data, true); @@ -490,13 +483,13 @@ TEST_F(TestPlasmaStore, RefreshLRUTest) { CreateObject(client_, id, {}, small_data, true); // the refreshed objects are not evicted - ARROW_CHECK_OK(client_.Contains(object_ids[2], &has_object)); + ASSERT_OK(client_.Contains(object_ids[2], &has_object)); ASSERT_TRUE(has_object); - ARROW_CHECK_OK(client_.Contains(object_ids[3], &has_object)); + ASSERT_OK(client_.Contains(object_ids[3], &has_object)); ASSERT_TRUE(has_object); // the next object in LRU order is evicted - ARROW_CHECK_OK(client_.Contains(object_ids[4], &has_object)); + ASSERT_OK(client_.Contains(object_ids[4], &has_object)); ASSERT_FALSE(has_object); } @@ -505,7 +498,7 @@ TEST_F(TestPlasmaStore, DeleteTest) { // Test for deleting non-existence object. Status result = client_.Delete(object_id); - ARROW_CHECK_OK(result); + ASSERT_OK(result); // Test for the object being in local Plasma store. // First create object. @@ -513,20 +506,20 @@ TEST_F(TestPlasmaStore, DeleteTest) { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); - ARROW_CHECK_OK(client_.Seal(object_id)); + ASSERT_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); + ASSERT_OK(client_.Seal(object_id)); result = client_.Delete(object_id); - ARROW_CHECK_OK(result); + ASSERT_OK(result); bool has_object = false; - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_OK(client_.Contains(object_id, &has_object)); ASSERT_TRUE(has_object); - ARROW_CHECK_OK(client_.Release(object_id)); + ASSERT_OK(client_.Release(object_id)); // object_id is marked as to-be-deleted, when it is not in use, it will be deleted. - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_OK(client_.Contains(object_id, &has_object)); ASSERT_FALSE(has_object); - ARROW_CHECK_OK(client_.Delete(object_id)); + ASSERT_OK(client_.Delete(object_id)); } TEST_F(TestPlasmaStore, DeleteObjectsTest) { @@ -535,31 +528,31 @@ TEST_F(TestPlasmaStore, DeleteObjectsTest) { // Test for deleting non-existence object. Status result = client_.Delete(std::vector{object_id1, object_id2}); - ARROW_CHECK_OK(result); + ASSERT_OK(result); // Test for the object being in local Plasma store. // First create object. int64_t data_size = 100; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data)); - ARROW_CHECK_OK(client_.Seal(object_id1)); - ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data)); - ARROW_CHECK_OK(client_.Seal(object_id2)); + ASSERT_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data)); + ASSERT_OK(client_.Seal(object_id1)); + ASSERT_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data)); + ASSERT_OK(client_.Seal(object_id2)); // Release the ref count of Create function. - ARROW_CHECK_OK(client_.Release(object_id1)); - ARROW_CHECK_OK(client_.Release(object_id2)); + ASSERT_OK(client_.Release(object_id1)); + ASSERT_OK(client_.Release(object_id2)); // Increase the ref count by calling Get using client2_. std::vector object_buffers; - ARROW_CHECK_OK(client2_.Get({object_id1, object_id2}, 0, &object_buffers)); + ASSERT_OK(client2_.Get({object_id1, object_id2}, 0, &object_buffers)); // Objects are still used by client2_. result = client_.Delete(std::vector{object_id1, object_id2}); - ARROW_CHECK_OK(result); + ASSERT_OK(result); // The object is used and it should not be deleted right now. bool has_object = false; - ARROW_CHECK_OK(client_.Contains(object_id1, &has_object)); + ASSERT_OK(client_.Contains(object_id1, &has_object)); ASSERT_TRUE(has_object); - ARROW_CHECK_OK(client_.Contains(object_id2, &has_object)); + ASSERT_OK(client_.Contains(object_id2, &has_object)); ASSERT_TRUE(has_object); // Decrease the ref count by deleting the PlasmaBuffer (in ObjectBuffer). // client2_ won't send the release request immediately because the trigger @@ -567,9 +560,9 @@ TEST_F(TestPlasmaStore, DeleteObjectsTest) { object_buffers.clear(); // Delete the objects. result = client2_.Delete(std::vector{object_id1, object_id2}); - ARROW_CHECK_OK(client_.Contains(object_id1, &has_object)); + ASSERT_OK(client_.Contains(object_id1, &has_object)); ASSERT_FALSE(has_object); - ARROW_CHECK_OK(client_.Contains(object_id2, &has_object)); + ASSERT_OK(client_.Contains(object_id2, &has_object)); ASSERT_FALSE(has_object); } @@ -578,7 +571,7 @@ TEST_F(TestPlasmaStore, ContainsTest) { // Test for object non-existence. bool has_object; - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_OK(client_.Contains(object_id, &has_object)); ASSERT_FALSE(has_object); // Test for the object being in local Plasma store. @@ -586,8 +579,8 @@ TEST_F(TestPlasmaStore, ContainsTest) { std::vector data(100, 0); CreateObject(client_, object_id, {42}, data); std::vector object_buffers; - ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_OK(client_.Get({object_id}, -1, &object_buffers)); + ASSERT_OK(client_.Contains(object_id, &has_object)); ASSERT_TRUE(has_object); } @@ -597,7 +590,7 @@ TEST_F(TestPlasmaStore, GetTest) { ObjectID object_id = random_object_id(); // Test for object non-existence. - ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers)); + ASSERT_OK(client_.Get({object_id}, 0, &object_buffers)); ASSERT_EQ(object_buffers.size(), 1); ASSERT_FALSE(object_buffers[0].metadata); ASSERT_FALSE(object_buffers[0].data); @@ -610,7 +603,7 @@ TEST_F(TestPlasmaStore, GetTest) { EXPECT_FALSE(client_.IsInUse(object_id)); object_buffers.clear(); - ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); + ASSERT_OK(client_.Get({object_id}, -1, &object_buffers)); ASSERT_EQ(object_buffers.size(), 1); ASSERT_EQ(object_buffers[0].device_num, 0); AssertObjectBufferEqual(object_buffers[0], {42}, {3, 5, 6, 7, 9}); @@ -633,7 +626,7 @@ TEST_F(TestPlasmaStore, LegacyGetTest) { ObjectBuffer object_buffer; // Test for object non-existence. - ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer)); + ASSERT_OK(client_.Get(&object_id, 1, 0, &object_buffer)); ASSERT_FALSE(object_buffer.metadata); ASSERT_FALSE(object_buffer.data); EXPECT_FALSE(client_.IsInUse(object_id)); @@ -643,12 +636,12 @@ TEST_F(TestPlasmaStore, LegacyGetTest) { CreateObject(client_, object_id, {42}, data); EXPECT_FALSE(client_.IsInUse(object_id)); - ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer)); + ASSERT_OK(client_.Get(&object_id, 1, -1, &object_buffer)); AssertObjectBufferEqual(object_buffer, {42}, {3, 5, 6, 7, 9}); } // Object needs releasing manually EXPECT_TRUE(client_.IsInUse(object_id)); - ARROW_CHECK_OK(client_.Release(object_id)); + ASSERT_OK(client_.Release(object_id)); EXPECT_FALSE(client_.IsInUse(object_id)); } @@ -662,15 +655,15 @@ TEST_F(TestPlasmaStore, MultipleGetTest) { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data)); + ASSERT_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data)); data->mutable_data()[0] = 1; - ARROW_CHECK_OK(client_.Seal(object_id1)); + ASSERT_OK(client_.Seal(object_id1)); - ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data)); + ASSERT_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data)); data->mutable_data()[0] = 2; - ARROW_CHECK_OK(client_.Seal(object_id2)); + ASSERT_OK(client_.Seal(object_id2)); - ARROW_CHECK_OK(client_.Get(object_ids, -1, &object_buffers)); + ASSERT_OK(client_.Get(object_ids, -1, &object_buffers)); ASSERT_EQ(object_buffers[0].data->data()[0], 1); ASSERT_EQ(object_buffers[1].data->data()[0], 2); } @@ -683,11 +676,11 @@ TEST_F(TestPlasmaStore, BatchCreateTest) { std::vector data = {"hello", "world"}; std::vector metadata = {"1", "2"}; - ARROW_CHECK_OK(client_.CreateAndSealBatch(object_ids, data, metadata)); + ASSERT_OK(client_.CreateAndSealBatch(object_ids, data, metadata)); std::vector object_buffers; - ARROW_CHECK_OK(client_.Get(object_ids, -1, &object_buffers)); + ASSERT_OK(client_.Get(object_ids, -1, &object_buffers)); std::string out1, out2; out1.assign(reinterpret_cast(object_buffers[0].data->data()), @@ -704,7 +697,7 @@ TEST_F(TestPlasmaStore, AbortTest) { std::vector object_buffers; // Test for object non-existence. - ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers)); + ASSERT_OK(client_.Get({object_id}, 0, &object_buffers)); ASSERT_FALSE(object_buffers[0].data); // Test object abort. @@ -714,7 +707,7 @@ TEST_F(TestPlasmaStore, AbortTest) { int64_t metadata_size = sizeof(metadata); std::shared_ptr data; uint8_t* data_ptr; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); + ASSERT_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); data_ptr = data->mutable_data(); // Write some data. for (int64_t i = 0; i < data_size / 2; i++) { @@ -724,21 +717,21 @@ TEST_F(TestPlasmaStore, AbortTest) { Status status = client_.Abort(object_id); ASSERT_TRUE(status.IsInvalid()); // Release, then abort. - ARROW_CHECK_OK(client_.Release(object_id)); + ASSERT_OK(client_.Release(object_id)); EXPECT_TRUE(client_.IsInUse(object_id)); - ARROW_CHECK_OK(client_.Abort(object_id)); + ASSERT_OK(client_.Abort(object_id)); EXPECT_FALSE(client_.IsInUse(object_id)); // Test for object non-existence after the abort. - ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers)); + ASSERT_OK(client_.Get({object_id}, 0, &object_buffers)); ASSERT_FALSE(object_buffers[0].data); // Create the object successfully this time. CreateObject(client_, object_id, {42, 43}, {1, 2, 3, 4, 5}); // Test that we can get the object. - ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); + ASSERT_OK(client_.Get({object_id}, -1, &object_buffers)); AssertObjectBufferEqual(object_buffers[0], {42, 43}, {1, 2, 3, 4, 5}); } @@ -749,7 +742,7 @@ TEST_F(TestPlasmaStore, OneIdCreateRepeatedlyTest) { std::vector object_buffers; // Test for object non-existence. - ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers)); + ASSERT_OK(client_.Get({object_id}, 0, &object_buffers)); ASSERT_FALSE(object_buffers[0].data); int64_t data_size = 20; @@ -759,18 +752,18 @@ TEST_F(TestPlasmaStore, OneIdCreateRepeatedlyTest) { // Test the sequence: create -> release -> abort -> ... for (int64_t i = 0; i < loop_times; i++) { std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); - ARROW_CHECK_OK(client_.Release(object_id)); - ARROW_CHECK_OK(client_.Abort(object_id)); + ASSERT_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); + ASSERT_OK(client_.Release(object_id)); + ASSERT_OK(client_.Abort(object_id)); } // Test the sequence: create -> seal -> release -> delete -> ... for (int64_t i = 0; i < loop_times; i++) { std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); - ARROW_CHECK_OK(client_.Seal(object_id)); - ARROW_CHECK_OK(client_.Release(object_id)); - ARROW_CHECK_OK(client_.Delete(object_id)); + ASSERT_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); + ASSERT_OK(client_.Seal(object_id)); + ASSERT_OK(client_.Release(object_id)); + ASSERT_OK(client_.Delete(object_id)); } } @@ -780,7 +773,7 @@ TEST_F(TestPlasmaStore, MultipleClientTest) { // Test for object non-existence on the first client. bool has_object; - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_OK(client_.Contains(object_id, &has_object)); ASSERT_FALSE(has_object); // Test for the object being in local Plasma store. @@ -789,25 +782,25 @@ TEST_F(TestPlasmaStore, MultipleClientTest) { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data)); - ARROW_CHECK_OK(client2_.Seal(object_id)); + ASSERT_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data)); + ASSERT_OK(client2_.Seal(object_id)); // Test that the first client can get the object. - ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); + ASSERT_OK(client_.Get({object_id}, -1, &object_buffers)); ASSERT_TRUE(object_buffers[0].data); - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_OK(client_.Contains(object_id, &has_object)); ASSERT_TRUE(has_object); // Test that one client disconnecting does not interfere with the other. // First create object on the second client. object_id = random_object_id(); - ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data)); + ASSERT_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data)); // Disconnect the first client. - ARROW_CHECK_OK(client_.Disconnect()); + ASSERT_OK(client_.Disconnect()); // Test that the second client can seal and get the created object. - ARROW_CHECK_OK(client2_.Seal(object_id)); - ARROW_CHECK_OK(client2_.Get({object_id}, -1, &object_buffers)); + ASSERT_OK(client2_.Seal(object_id)); + ASSERT_OK(client2_.Get({object_id}, -1, &object_buffers)); ASSERT_TRUE(object_buffers[0].data); - ARROW_CHECK_OK(client2_.Contains(object_id, &has_object)); + ASSERT_OK(client2_.Contains(object_id, &has_object)); ASSERT_TRUE(has_object); } @@ -821,7 +814,7 @@ TEST_F(TestPlasmaStore, ManyObjectTest) { // Test for object non-existence on the first client. bool has_object; - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_OK(client_.Contains(object_id, &has_object)); ASSERT_FALSE(has_object); // Test for the object being in local Plasma store. @@ -830,29 +823,29 @@ TEST_F(TestPlasmaStore, ManyObjectTest) { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); + ASSERT_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); if (i % 3 == 0) { // Seal one third of the objects. - ARROW_CHECK_OK(client_.Seal(object_id)); + ASSERT_OK(client_.Seal(object_id)); // Test that the first client can get the object. - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_OK(client_.Contains(object_id, &has_object)); ASSERT_TRUE(has_object); } else if (i % 3 == 1) { // Abort one third of the objects. - ARROW_CHECK_OK(client_.Release(object_id)); - ARROW_CHECK_OK(client_.Abort(object_id)); + ASSERT_OK(client_.Release(object_id)); + ASSERT_OK(client_.Abort(object_id)); } } // Disconnect the first client. All unsealed objects should be aborted. - ARROW_CHECK_OK(client_.Disconnect()); + ASSERT_OK(client_.Disconnect()); // Check that the second client can query the object store for the first // client's objects. int i = 0; for (auto const& object_id : object_ids) { bool has_object; - ARROW_CHECK_OK(client2_.Contains(object_id, &has_object)); + ASSERT_OK(client2_.Contains(object_id, &has_object)); if (i % 3 == 0) { // The first third should be sealed. ASSERT_TRUE(has_object); @@ -898,7 +891,7 @@ TEST_F(TestPlasmaStore, GetGPUTest) { std::vector object_buffers; // Test for object non-existence. - ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers)); + ASSERT_OK(client_.Get({object_id}, 0, &object_buffers)); ASSERT_EQ(object_buffers.size(), 1); ASSERT_FALSE(object_buffers[0].data); @@ -910,15 +903,15 @@ TEST_F(TestPlasmaStore, GetGPUTest) { int64_t metadata_size = sizeof(metadata); std::shared_ptr data_buffer; std::shared_ptr gpu_buffer; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, - &data_buffer, kGpuDeviceNumber)); + ASSERT_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data_buffer, + kGpuDeviceNumber)); ASSERT_OK_AND_ASSIGN(gpu_buffer, CudaBuffer::FromBuffer(data_buffer)); CudaBufferWriter writer(gpu_buffer); - ARROW_CHECK_OK(writer.Write(data, data_size)); - ARROW_CHECK_OK(client_.Seal(object_id)); + ASSERT_OK(writer.Write(data, data_size)); + ASSERT_OK(client_.Seal(object_id)); object_buffers.clear(); - ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); + ASSERT_OK(client_.Get({object_id}, -1, &object_buffers)); ASSERT_EQ(object_buffers.size(), 1); ASSERT_EQ(object_buffers[0].device_num, kGpuDeviceNumber); // Check data @@ -933,33 +926,33 @@ TEST_F(TestPlasmaStore, DeleteObjectsGPUTest) { // Test for deleting non-existence object. Status result = client_.Delete(std::vector{object_id1, object_id2}); - ARROW_CHECK_OK(result); + ASSERT_OK(result); // Test for the object being in local Plasma store. // First create object. int64_t data_size = 100; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data, - kGpuDeviceNumber)); - ARROW_CHECK_OK(client_.Seal(object_id1)); - ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data, - kGpuDeviceNumber)); - ARROW_CHECK_OK(client_.Seal(object_id2)); + ASSERT_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data, + kGpuDeviceNumber)); + ASSERT_OK(client_.Seal(object_id1)); + ASSERT_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data, + kGpuDeviceNumber)); + ASSERT_OK(client_.Seal(object_id2)); // Release the ref count of Create function. - ARROW_CHECK_OK(client_.Release(object_id1)); - ARROW_CHECK_OK(client_.Release(object_id2)); + ASSERT_OK(client_.Release(object_id1)); + ASSERT_OK(client_.Release(object_id2)); // Increase the ref count by calling Get using client2_. std::vector object_buffers; - ARROW_CHECK_OK(client2_.Get({object_id1, object_id2}, 0, &object_buffers)); + ASSERT_OK(client2_.Get({object_id1, object_id2}, 0, &object_buffers)); // Objects are still used by client2_. result = client_.Delete(std::vector{object_id1, object_id2}); - ARROW_CHECK_OK(result); + ASSERT_OK(result); // The object is used and it should not be deleted right now. bool has_object = false; - ARROW_CHECK_OK(client_.Contains(object_id1, &has_object)); + ASSERT_OK(client_.Contains(object_id1, &has_object)); ASSERT_TRUE(has_object); - ARROW_CHECK_OK(client_.Contains(object_id2, &has_object)); + ASSERT_OK(client_.Contains(object_id2, &has_object)); ASSERT_TRUE(has_object); // Decrease the ref count by deleting the PlasmaBuffer (in ObjectBuffer). // client2_ won't send the release request immediately because the trigger @@ -967,9 +960,9 @@ TEST_F(TestPlasmaStore, DeleteObjectsGPUTest) { object_buffers.clear(); // Delete the objects. result = client2_.Delete(std::vector{object_id1, object_id2}); - ARROW_CHECK_OK(client_.Contains(object_id1, &has_object)); + ASSERT_OK(client_.Contains(object_id1, &has_object)); ASSERT_FALSE(has_object); - ARROW_CHECK_OK(client_.Contains(object_id2, &has_object)); + ASSERT_OK(client_.Contains(object_id2, &has_object)); ASSERT_FALSE(has_object); } @@ -986,25 +979,25 @@ TEST_F(TestPlasmaStore, RepeatlyCreateGPUTest) { ObjectID& object_id = object_ids[i]; std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, kGpuDeviceNumber)); - ARROW_CHECK_OK(client_.Seal(object_id)); - ARROW_CHECK_OK(client_.Release(object_id)); + ASSERT_OK(client_.Create(object_id, data_size, 0, 0, &data, kGpuDeviceNumber)); + ASSERT_OK(client_.Seal(object_id)); + ASSERT_OK(client_.Release(object_id)); } // delete and create again for (int64_t i = 0; i < loop_times; i++) { ObjectID& object_id = object_ids[i % object_num]; - ARROW_CHECK_OK(client_.Delete(object_id)); + ASSERT_OK(client_.Delete(object_id)); std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, kGpuDeviceNumber)); - ARROW_CHECK_OK(client_.Seal(object_id)); - ARROW_CHECK_OK(client_.Release(object_id)); + ASSERT_OK(client_.Create(object_id, data_size, 0, 0, &data, kGpuDeviceNumber)); + ASSERT_OK(client_.Seal(object_id)); + ASSERT_OK(client_.Release(object_id)); } // delete all - ARROW_CHECK_OK(client_.Delete(object_ids)); + ASSERT_OK(client_.Delete(object_ids)); } TEST_F(TestPlasmaStore, GPUBufferLifetime) { @@ -1013,23 +1006,23 @@ TEST_F(TestPlasmaStore, GPUBufferLifetime) { const int64_t data_size = 40; std::shared_ptr create_buff; - ARROW_CHECK_OK( + ASSERT_OK( client_.Create(object_id, data_size, nullptr, 0, &create_buff, kGpuDeviceNumber)); - ARROW_CHECK_OK(client_.Seal(object_id)); - ARROW_CHECK_OK(client_.Release(object_id)); + ASSERT_OK(client_.Seal(object_id)); + ASSERT_OK(client_.Release(object_id)); ObjectBuffer get_buff_1; - ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &get_buff_1)); + ASSERT_OK(client_.Get(&object_id, 1, -1, &get_buff_1)); ObjectBuffer get_buff_2; - ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &get_buff_2)); - ARROW_CHECK_OK(client_.Release(object_id)); - ARROW_CHECK_OK(client_.Release(object_id)); + ASSERT_OK(client_.Get(&object_id, 1, -1, &get_buff_2)); + ASSERT_OK(client_.Release(object_id)); + ASSERT_OK(client_.Release(object_id)); ObjectBuffer get_buff_3; - ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &get_buff_3)); - ARROW_CHECK_OK(client_.Release(object_id)); + ASSERT_OK(client_.Get(&object_id, 1, -1, &get_buff_3)); + ASSERT_OK(client_.Release(object_id)); - ARROW_CHECK_OK(client_.Delete(object_id)); + ASSERT_OK(client_.Delete(object_id)); } TEST_F(TestPlasmaStore, MultipleClientGPUTest) { @@ -1038,7 +1031,7 @@ TEST_F(TestPlasmaStore, MultipleClientGPUTest) { // Test for object non-existence on the first client. bool has_object; - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_OK(client_.Contains(object_id, &has_object)); ASSERT_FALSE(has_object); // Test for the object being in local Plasma store. @@ -1047,27 +1040,27 @@ TEST_F(TestPlasmaStore, MultipleClientGPUTest) { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data, - kGpuDeviceNumber)); - ARROW_CHECK_OK(client2_.Seal(object_id)); + ASSERT_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data, + kGpuDeviceNumber)); + ASSERT_OK(client2_.Seal(object_id)); // Test that the first client can get the object. - ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_OK(client_.Get({object_id}, -1, &object_buffers)); + ASSERT_OK(client_.Contains(object_id, &has_object)); ASSERT_TRUE(has_object); // Test that one client disconnecting does not interfere with the other. // First create object on the second client. object_id = random_object_id(); - ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data, - kGpuDeviceNumber)); + ASSERT_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data, + kGpuDeviceNumber)); // Disconnect the first client. - ARROW_CHECK_OK(client_.Disconnect()); + ASSERT_OK(client_.Disconnect()); // Test that the second client can seal and get the created object. - ARROW_CHECK_OK(client2_.Seal(object_id)); + ASSERT_OK(client2_.Seal(object_id)); object_buffers.clear(); - ARROW_CHECK_OK(client2_.Contains(object_id, &has_object)); + ASSERT_OK(client2_.Contains(object_id, &has_object)); ASSERT_TRUE(has_object); - ARROW_CHECK_OK(client2_.Get({object_id}, -1, &object_buffers)); + ASSERT_OK(client2_.Get({object_id}, -1, &object_buffers)); ASSERT_EQ(object_buffers.size(), 1); ASSERT_EQ(object_buffers[0].device_num, kGpuDeviceNumber); AssertCudaRead(object_buffers[0].metadata, {5}); diff --git a/cpp/src/plasma/test/external_store_tests.cc b/cpp/src/plasma/test/external_store_tests.cc index 804e9cdc8bf..bc4bc133348 100644 --- a/cpp/src/plasma/test/external_store_tests.cc +++ b/cpp/src/plasma/test/external_store_tests.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include #include @@ -25,11 +26,8 @@ #include #include -#include - #include "arrow/testing/gtest_util.h" #include "arrow/util/io_util.h" - #include "plasma/client.h" #include "plasma/common.h" #include "plasma/external_store.h" @@ -65,11 +63,11 @@ class TestPlasmaStoreWithExternal : public ::testing::Test { " 1> /tmp/log.stdout 2> /tmp/log.stderr & " + "echo $! > " + store_socket_name_ + ".pid"; PLASMA_CHECK_SYSTEM(system(plasma_command.c_str())); - ARROW_CHECK_OK(client_.Connect(store_socket_name_, "")); + ASSERT_OK(client_.Connect(store_socket_name_, "")); } void TearDown() override { - ARROW_CHECK_OK(client_.Disconnect()); + ASSERT_OK(client_.Disconnect()); // Kill plasma_store process that we started #ifdef COVERAGE_BUILD // Ask plasma_store to exit gracefully and give it time to write out @@ -100,14 +98,14 @@ TEST_F(TestPlasmaStoreWithExternal, EvictionTest) { // Test for object non-existence. bool has_object; - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_OK(client_.Contains(object_id, &has_object)); ASSERT_FALSE(has_object); // Test for the object being in local Plasma store. // Create and seal the object. - ARROW_CHECK_OK(client_.CreateAndSeal(object_id, data, metadata)); + ASSERT_OK(client_.CreateAndSeal(object_id, data, metadata)); // Test that the client can get the object. - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_OK(client_.Contains(object_id, &has_object)); ASSERT_TRUE(has_object); } @@ -118,7 +116,7 @@ TEST_F(TestPlasmaStoreWithExternal, EvictionTest) { // external store on failure. This should succeed to fetch the object. // However, it may evict the next few objects. std::vector object_buffers; - ARROW_CHECK_OK(client_.Get({object_ids[i]}, -1, &object_buffers)); + ASSERT_OK(client_.Get({object_ids[i]}, -1, &object_buffers)); ASSERT_EQ(object_buffers.size(), 1); ASSERT_EQ(object_buffers[0].device_num, 0); ASSERT_TRUE(object_buffers[0].data); @@ -127,7 +125,7 @@ TEST_F(TestPlasmaStoreWithExternal, EvictionTest) { // Make sure we still cannot fetch objects that do not exist std::vector object_buffers; - ARROW_CHECK_OK(client_.Get({random_object_id()}, 100, &object_buffers)); + ASSERT_OK(client_.Get({random_object_id()}, 100, &object_buffers)); ASSERT_EQ(object_buffers.size(), 1); ASSERT_EQ(object_buffers[0].device_num, 0); ASSERT_EQ(object_buffers[0].data, nullptr); From fc0d9dd28f87c9fde63c993da8714890fa97bd9b Mon Sep 17 00:00:00 2001 From: Siyuan Date: Wed, 11 Mar 2020 21:28:36 -0700 Subject: [PATCH 2/2] fix --- cpp/src/plasma/store.cc | 3 +-- cpp/src/plasma/store.h | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 9d7f2f1f35c..11abb8e09d8 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -1247,7 +1247,6 @@ void StartServer(std::string socket_name, int64_t system_memory, external_store = ExternalStores::GetStore(name); if (external_store == nullptr) { ARROW_LOG(FATAL) << "No such external store \"" << name << "\""; - return -1; } ARROW_LOG(DEBUG) << "connecting to external store..."; ARROW_CHECK_OK(external_store->Connect(external_store_endpoint)); @@ -1310,7 +1309,7 @@ int main(int argc, char* argv[]) { "filesystem with -d"; } ARROW_LOG(DEBUG) << "starting server listening on " << socket_name; - plasma::StartServer(socket_name, plasma_directory, hugepages_enabled, + plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled, external_store_endpoint); plasma::g_runner->Shutdown(); plasma::g_runner = nullptr; diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index 7c3b5fbe3fb..715eb69f4b2 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -58,8 +58,7 @@ class PlasmaStore { public: using NotificationMap = std::unordered_map; - // TODO: PascalCase PlasmaStore methods. - PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled, + PlasmaStore(EventLoop* loop, const std::string& directory, bool hugepages_enabled, const std::string& socket_name, std::shared_ptr external_store);