Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion backend/include/backend/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,12 @@ class Session
void registerRpcSftpAddBulkUploadOperation();
void registerRpcSftpAddBulkDeleteOperation();
void registerRpcSftpExistsBatch();
void registerRpcSftpAddSyncScanOperation();
void registerRpcSftpOpenSyncSession();
void registerRpcSftpRecomputeSyncDiff();
void registerRpcSftpLoadSyncDiffChildren();
void registerRpcSftpBuildSyncEnqueuePlan();
void registerRpcSftpCancelSyncDiff();
void registerRpcSftpCloseSyncSession();
void registerOperationQueuePauseUnpause();

void removeChannel(Ids::ChannelId channelId);
Expand Down
43 changes: 41 additions & 2 deletions backend/include/backend/sftp/local_scan_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <backend/sftp/operation.hpp>
#include <nui/utility/move_detector.hpp>
#include <shared_data/ignore_rules.hpp>
#include <shared_data/sync/scan_node.hpp>
#include <shared_data/sync/tree_directory_walker.hpp>
#include <utility/directory_traversal.hpp>

#include <cstdint>
Expand All @@ -26,6 +28,9 @@ class LocalScanOperation : public Operation
bool recursive{true};
/// When true, entries whose filename starts with '.' are skipped during scan.
bool ignoreHidden{false};
/// When true, the operation builds a sorted @ref SharedData::Sync::ScanNode tree
/// instead of a flat entry vector.
bool buildTree{false};
};

LocalScanOperation(ScanOperationOptions options);
Expand Down Expand Up @@ -81,10 +86,25 @@ class LocalScanOperation : public Operation
return fn(static_cast<WalkerType&>(*walker_));
}

/**
* @brief Variant of @ref withWalkerDo that drives a sorted ScanNode tree walker.
* Only call when @ref ScanOperationOptions::buildTree was set.
*/
auto withTreeWalkerDo(auto&& fn)
{
auto scan = [this](std::filesystem::path const& path)
{
return scanner(path);
};
using WalkerType = SharedData::Sync::TreeDirectoryWalker<Error, decltype(scan), true>;
if (!walker_)
walker_ = std::make_unique<WalkerType>(localPath_, std::move(scan));
return fn(static_cast<WalkerType&>(*walker_));
}

/**
* @brief Eject the scanned directory entries. Careful!: The internal list is moved out.
*
* @return std::vector<SharedData::DirectoryEntry>
* Only valid when @ref buildsTree() is false.
*/
std::vector<SharedData::DirectoryEntry> ejectEntries()
{
Expand All @@ -96,6 +116,24 @@ class LocalScanOperation : public Operation
);
}

/**
* @brief Eject the scanned node tree. Only valid when @ref buildsTree() is true.
*/
SharedData::Sync::ScanNode ejectScanTree()
{
return withTreeWalkerDo(
[](auto& walker)
{
return std::move(walker).ejectTree();
}
);
}

bool buildsTree() const noexcept
{
return buildTree_;
}

std::uint64_t totalBytes() const;

std::expected<std::vector<SharedData::DirectoryEntry>, Error> scanner(std::filesystem::path const& path);
Expand All @@ -107,6 +145,7 @@ class LocalScanOperation : public Operation
bool respectIgnoreFiles_;
bool recursive_;
bool ignoreHidden_;
bool buildTree_;
bool rootScanned_{false};
SharedData::IgnoreMatcher ignoreMatcher_;
std::unique_ptr<Utility::BaseDirectoryWalker> walker_;
Expand Down
37 changes: 34 additions & 3 deletions backend/include/backend/sftp/operation_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <backend/sftp/all_operations.hpp>
#include <backend/sftp/bulk_resume_registry.hpp>
#include <backend/sync/sync_session.hpp>
#include <persistence/state/state.hpp>
#include <ssh/sftp_session.hpp>
#include <nui/rpc.hpp>
Expand All @@ -11,6 +12,10 @@
#include <shared_data/file_operations/operation_mode.hpp>
#include <shared_data/file_operations/bulk_add_request.hpp>
#include <shared_data/directory_entry.hpp>
#include <shared_data/sync/diff.hpp>
#include <shared_data/sync/diff_summary.hpp>
#include <shared_data/sync/diff_tree_node.hpp>
#include <shared_data/sync/scan_node.hpp>

#include <deque>
#include <filesystem>
Expand Down Expand Up @@ -224,8 +229,14 @@ class OperationQueue
* @param remotePath Remote directory root to scan.
* @param localPath Local directory root to scan.
*/
/**
* @brief Opens a @ref SyncSession and kicks off both scans. The scans build
* ScanNode trees directly and feed them into the session on completion.
* Emits @c onSyncScanPhaseDone(sessionId, isLocal) for each side.
*/
void addSyncScanOperation(
SecureShell::SftpSession& sftp,
Ids::SyncSessionId syncSessionId,
Ids::OperationId remoteScanId,
Ids::OperationId localScanId,
std::filesystem::path const& remotePath,
Expand All @@ -235,6 +246,20 @@ class OperationQueue
bool ignoreHidden
);

/**
* @brief Returns the session with the given id, or nullptr when it has been
* closed / is unknown. Callers must post onto @ref SyncSession::strand()
* before touching mutable state.
*/
std::shared_ptr<SyncSession> syncSession(Ids::SyncSessionId const& sessionId) const;

/**
* @brief Removes the session from the registry. Flips its cancel flag first so
* any in-flight walk exits at its next checkpoint. The destructor runs
* once the last @c shared_ptr reference (captured tasks + this map) drops.
*/
void closeSyncSession(Ids::SyncSessionId const& sessionId);

void registerRpc();

bool paused() const;
Expand Down Expand Up @@ -295,9 +320,15 @@ class OperationQueue
std::deque<std::pair<Ids::OperationId, std::unique_ptr<Operation>>> operations_{};
std::atomic_bool paused_{true};
int parallelism_{1};
// Keyed by operationId.value(). Called when a sync-only scan completes and ejects its results.
std::map<std::string, std::function<void(std::vector<SharedData::DirectoryEntry>, std::uint64_t)>>
syncScanCallbacks_{};
// Keyed by operationId.value(). Called when a sync-only scan completes and
// hands off its ScanNode tree. The scan is known to be build-tree mode so
// @ref ScanOperation::ejectScanTree() has the payload.
std::map<std::string, std::function<void(SharedData::Sync::ScanNode)>> syncScanCallbacks_{};

// SyncSessions active on this channel. Keyed by SyncSessionId.value() because
// Ids::IdHash isn't specialized on the strongly-typed id; the existing
// bulkResumeStash_ does the specialization — we stay string-keyed for simplicity.
std::map<std::string, std::shared_ptr<SyncSession>> syncSessions_{};

// Per-bulk backup snapshot, populated when a bulk-add path enqueues an
// op and erased once the op completes (success or failure). Keyed by
Expand Down
44 changes: 42 additions & 2 deletions backend/include/backend/sftp/scan_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <backend/sftp/operation.hpp>
#include <nui/utility/move_detector.hpp>
#include <shared_data/ignore_rules.hpp>
#include <shared_data/sync/scan_node.hpp>
#include <shared_data/sync/tree_directory_walker.hpp>
#include <utility/directory_traversal.hpp>

#include <filesystem>
Expand All @@ -25,6 +27,10 @@ class ScanOperation : public Operation
bool recursive{true};
/// When true, entries whose filename starts with '.' are skipped during scan.
bool ignoreHidden{false};
/// When true, the operation builds a sorted @ref SharedData::Sync::ScanNode tree
/// instead of a flat entry vector. Only one shape is active at a time; use
/// @ref ejectScanTree() in that case and @ref ejectEntries() otherwise.
bool buildTree{false};
};

SecureShell::ProcessingStrand* strand() const override;
Expand Down Expand Up @@ -80,10 +86,25 @@ class ScanOperation : public Operation
return fn(static_cast<WalkerType&>(*walker_));
}

/**
* @brief Variant of @ref withWalkerDo that drives a sorted ScanNode tree walker.
* Only call when @ref ScanOperationOptions::buildTree was set.
*/
auto withTreeWalkerDo(auto&& fn)
{
auto scan = [this](std::filesystem::path const& path)
{
return scanner(path);
};
using WalkerType = SharedData::Sync::TreeDirectoryWalker<Error, decltype(scan), true>;
if (!walker_)
walker_ = std::make_unique<WalkerType>(remotePath_, std::move(scan));
return fn(static_cast<WalkerType&>(*walker_));
}

/**
* @brief Eject the scanned directory entries. Careful!: The internal list is moved out.
*
* @return std::vector<SharedData::DirectoryEntry>
* Only valid when @ref buildsTree() is false.
*/
std::vector<SharedData::DirectoryEntry> ejectEntries()
{
Expand All @@ -95,6 +116,24 @@ class ScanOperation : public Operation
);
}

/**
* @brief Eject the scanned node tree. Only valid when @ref buildsTree() is true.
*/
SharedData::Sync::ScanNode ejectScanTree()
{
return withTreeWalkerDo(
[](auto& walker)
{
return std::move(walker).ejectTree();
}
);
}

bool buildsTree() const noexcept
{
return buildTree_;
}

std::uint64_t totalBytes() const;

std::expected<std::vector<SharedData::DirectoryEntry>, Error> scanner(std::filesystem::path const& path);
Expand All @@ -108,6 +147,7 @@ class ScanOperation : public Operation
bool respectIgnoreFiles_;
bool recursive_;
bool ignoreHidden_;
bool buildTree_;
bool rootScanned_{false};
SharedData::IgnoreMatcher ignoreMatcher_;
std::unique_ptr<Utility::BaseDirectoryWalker> walker_;
Expand Down
Loading