diff --git a/src/fdb5/toc/TocHandler.cc b/src/fdb5/toc/TocHandler.cc index 1a61f074d..1f261c23e 100644 --- a/src/fdb5/toc/TocHandler.cc +++ b/src/fdb5/toc/TocHandler.cc @@ -720,12 +720,43 @@ class SubtocPreloader { } AutoFDCloser& operator=(const AutoFDCloser&) = delete; ~AutoFDCloser() { - if (fd_ > 0) { + if (fd_ >= 0) { ::close(fd_); // n.b. ignore return value } } }; + /// Ensure we are cleaning up any aio request in case of stack unwinding + struct AioCanceller { + std::vector& ptrs_; + + explicit AioCanceller(std::vector& ptrs) : ptrs_(ptrs) {} + ~AioCanceller() { + for (auto* p : ptrs_) { + if (p) { + ::aio_cancel(p->aio_fildes, p); + } + } + for (auto* p : ptrs_) { + if (!p) { + continue; + } + + int err = ::aio_error(p); + if (err == EINVAL) { + continue; + } // not a valid outstanding request + + while (err == EINPROGRESS) { + ::aio_suspend(&p, 1, nullptr); + err = ::aio_error(p); + } + // err is now 0 (completed), ECANCELED, or another errno. + // Must call aio_return exactly once to reap. + ::aio_return(p); + } + } + }; const Key& parentKey_; mutable std::map> subTocReadCache_; @@ -748,20 +779,17 @@ class SubtocPreloader { #endif std::vector aiocbs(paths_.size()); - std::vector buffers(paths_.size()); + std::vector aiocbPtrs(aiocbs.size(), nullptr); + std::vector> buffers(paths_.size()); std::vector closers; - std::vector done(paths_.size()); - ::memset(done.data(), 0, done.size() * sizeof(char)); - ::memset(aiocbs.data(), 0, sizeof(aiocb) * aiocbs.size()); + AioCanceller ac(aiocbPtrs); { eckit::Timer sstime("subtocs.statsubmit", Log::debug()); - for (int i = 0; i < aiocbs.size(); ++i) { + for (size_t i = 0; i < aiocbs.size(); ++i) { const eckit::LocalPathName& path = paths_[i]; - - int fd; - SYSCALL2((fd = ::open(path.localPath(), iomode)), path); + const int fd = SYSCALL2(::open(path.localPath(), iomode), path); closers.emplace_back(AutoFDCloser{fd}); eckit::Length tocSize = path.size(); @@ -777,64 +805,72 @@ class SubtocPreloader { aio.aio_buf = buffers[i].data(); SYSCALL(::aio_read(&aio)); + aiocbPtrs[i] = &aio; } } - std::vector aiocbPtrs(aiocbs.size()); - for (int i = 0; i < aiocbs.size(); ++i) { - aiocbPtrs[i] = &aiocbs[i]; - } - - int doneCount = 0; - { eckit::Timer sstime("subtocs.collect", Log::debug()); - while (doneCount < aiocbs.size()) { + while (std::any_of(std::begin(aiocbPtrs), std::end(aiocbPtrs), + [](const auto ptr) { return ptr != nullptr; })) { // Now wait until data is ready from at least one read - + struct timespec timeout{}; + timeout.tv_sec = 1; errno = 0; - while (::aio_suspend(aiocbPtrs.data(), aiocbs.size(), nullptr) < 0) { - if (errno != EINTR) { - throw FailedSystemCall("aio_suspend", Here(), errno); + while (::aio_suspend(aiocbPtrs.data(), aiocbPtrs.size(), &timeout) < 0) { + switch (errno) { + case EINTR: + // got interrupted, continue sleeping + continue; + case EAGAIN: + // timeout reached + + LOG_DEBUG_LIB(LibFdb5) << "Reading subtocs - timeout(1s) reached for:\n"; + for (size_t index = 0; index < aiocbPtrs.size(); ++index) { + if (aiocbPtrs[index] == nullptr) { + continue; + } + LOG_DEBUG_LIB(LibFdb5) << paths_[index].localPath() << "\n"; + } + LOG_DEBUG_LIB(LibFdb5) << "retrying." << std::endl; + continue; + default: + throw FailedSystemCall("aio_suspend", Here(), errno); } } // Find which one(s) are ready - - for (int n = 0; n < aiocbs.size(); ++n) { - - if (done[n]) { + for (size_t n = 0; n < aiocbPtrs.size(); ++n) { + if (aiocbPtrs[n] == nullptr) { + // already completed, skip continue; } - - int e = ::aio_error(&aiocbs[n]); + const int e = ::aio_error(aiocbPtrs[n]); if (e == EINPROGRESS) { continue; } - if (e == 0) { - - ssize_t len = ::aio_return(&aiocbs[n]); + auto aioptr = aiocbPtrs[n]; + aiocbPtrs[n] = nullptr; + const auto len = SYSCALL(::aio_return(aioptr)); if (len != buffers[n].size()) { - aiocbs[n].aio_nbytes = len; + // File has been truncated since stat has been called + throw eckit::ShortFile(paths_[n], Here()); } bool grow = true; auto cachedToc = std::make_unique(buffers[n].size(), grow); { - cachedToc->openForWrite(aiocbs[n].aio_nbytes); + cachedToc->openForWrite(buffers[n].size()); AutoClose closer(*cachedToc); - ASSERT(cachedToc->write(buffers[n].data(), aiocbs[n].aio_nbytes) == aiocbs[n].aio_nbytes); + ASSERT(cachedToc->write(buffers[n].data(), buffers[n].size()) == buffers[n].size()); } ASSERT(subTocReadCache_.find(paths_[n]) == subTocReadCache_.end()); subTocReadCache_.emplace( paths_[n], std::make_unique(paths_[n], parentKey_, cachedToc.release())); - - done[n] = true; - doneCount++; } else { throw FailedSystemCall("aio_error", Here(), e);