diff --git a/src/borg/archive.py b/src/borg/archive.py index 16ab75e31e..3ea0f413cb 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1,4 +1,5 @@ import errno +import io import os import socket import stat @@ -22,7 +23,7 @@ from .compress import Compressor from .constants import * # NOQA from .hashindex import ChunkIndex, ChunkIndexEntry -from .helpers import Manifest +from .helpers import Manifest, slice_chunks from .helpers import Chunk, ChunkIteratorFileWrapper, open_item from .helpers import Error, IntegrityError from .helpers import uid2user, user2uid, gid2group, group2gid @@ -428,7 +429,7 @@ def add_file_chunks(chunks): return stats def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False, - hardlink_masters=None, stripped_components=0, original_path=None, pi=None): + hardlink_masters=None, complete_partial=False, stripped_components=0, original_path=None, pi=None): """ Extract archive item. @@ -437,6 +438,7 @@ def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sp :param dry_run: do not write any data :param stdout: write extracted data to stdout :param sparse: write sparse files (chunk-granularity, independent of the original being sparse) + :param complete_partial: False: replace files, True: existing files are completed :param hardlink_masters: maps paths to (chunks, link_target) for extracting subtrees with hardlinks correctly :param stripped_components: stripped leading path components to correct hard link extraction :param original_path: 'path' key as stored in archive @@ -466,14 +468,15 @@ def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sp # Attempt to remove existing files, ignore errors on failure try: st = os.lstat(path) - if stat.S_ISDIR(st.st_mode): - os.rmdir(path) - else: - os.unlink(path) + if not complete_partial: + if stat.S_ISDIR(st.st_mode): + os.rmdir(path) + else: + os.unlink(path) except UnicodeEncodeError: raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None except OSError: - pass + st = None mode = item.mode if stat.S_ISREG(mode): with backup_io(): @@ -496,9 +499,30 @@ def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sp return # Extract chunks, since the item which had the chunks was not extracted with backup_io(): - fd = open(path, 'wb') + if complete_partial and st is not None: + # Open existing file for updating + # Note that 'ab' wouldn't work on e.g. NetBSD, since seeking would be meaningless for writes. + # However, r+b requires the file to exist. Therefore we need to distinguish the two cases here. + fd = open(path, 'r+b') + else: + fd = open(path, 'wb') with fd: - ids = [c.id for c in item.chunks] + chunks = item.chunks + if complete_partial: + with backup_io(): + fd.seek(0, io.SEEK_END) + existing_length = fd.tell() + # Slice chunks by current length of the existing file. + chunks, prefix_length = slice_chunks(chunks, maximum_length=existing_length) + # We don't bother extracting fractional chunks. Just seek to a chunk boundary. + fd.seek(prefix_length) + fd.truncate() + discarded_count = len(item.chunks) - len(chunks) + discarded_chunks_ids = [c.id for c in item.chunks[:discarded_count]] + self.repository.discard_preload(discarded_chunks_ids) + if pi: + pi.show(increase=prefix_length) + ids = [c.id for c in chunks] for _, data in self.pipeline.fetch_many(ids, is_preloaded=True): if pi: pi.show(increase=len(data)) diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 3abe7a5069..790a91b38a 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -487,11 +487,16 @@ def do_extract(self, args, repository, manifest, key, archive): dry_run = args.dry_run stdout = args.stdout sparse = args.sparse + continue_ = args.continue_ strip_components = args.strip_components dirs = [] partial_extract = not matcher.empty() or strip_components hardlink_masters = {} if partial_extract else None + if stdout and continue_: + self.print_error('Cannot combine --stdout and --continue.') + return self.exit_code + def peek_and_store_hardlink_masters(item, matched): if (partial_extract and not matched and stat.S_ISREG(item.mode) and item.get('hardlink_master', True) and 'source' not in item): @@ -527,8 +532,9 @@ def peek_and_store_hardlink_masters(item, matched): dirs.append(item) archive.extract_item(item, restore_attrs=False) else: - archive.extract_item(item, stdout=stdout, sparse=sparse, hardlink_masters=hardlink_masters, - stripped_components=strip_components, original_path=orig_path, pi=pi) + archive.extract_item(item, stdout=stdout, complete_partial=continue_, sparse=sparse, + hardlink_masters=hardlink_masters, original_path=orig_path, pi=pi, + stripped_components=strip_components) except BackupOSError as e: self.print_warning('%s: %s', remove_surrogates(orig_path), e) @@ -1867,6 +1873,9 @@ def build_parser(self, prog=None): subparser.add_argument('--sparse', dest='sparse', action='store_true', default=False, help='create holes in output sparse file from all-zero chunks') + subparser.add_argument('--continue', dest='continue_', + action='store_true', default=False, + help='continue interrupted extraction') subparser.add_argument('location', metavar='ARCHIVE', type=location_validator(archive=True), help='archive to extract') diff --git a/src/borg/helpers.py b/src/borg/helpers.py index 6d6b8c7e75..bb7fe70846 100644 --- a/src/borg/helpers.py +++ b/src/borg/helpers.py @@ -23,7 +23,7 @@ from datetime import datetime, timezone, timedelta from fnmatch import translate from functools import wraps, partial, lru_cache -from itertools import islice +from itertools import islice, dropwhile from operator import attrgetter from string import Formatter @@ -1769,3 +1769,21 @@ def swidth_slice(string, max_width): if reverse: result.reverse() return ''.join(result) + + +def slice_chunks(chunks, maximum_length): + """ + Slice *chunks* (list(ChunkListEntry)) to remove a prefix of *maximum_length*. + + Return (sliced_chunks, prefix_length). + """ + def should_drop(chunk): + nonlocal current_length + dropped = (current_length + chunk.size) <= maximum_length + if dropped: + current_length += chunk.size + return dropped + + current_length = 0 + sliced_chunks = dropwhile(should_drop, chunks) + return list(sliced_chunks), current_length diff --git a/src/borg/remote.py b/src/borg/remote.py index 294bd40cf7..e029bc6ef2 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -506,6 +506,22 @@ def close(self): def preload(self, ids): self.preload_ids += ids + def discard_preload(self, discard_ids): + # Stop preloading these immediately + self.preload_ids[:] = [id for id in self.preload_ids + if id not in discard_ids] + + for discard_id in discard_ids: + msgids = self.cache.get((discard_id,), []) + for msgid in list(msgids): + if msgid in self.responses: + # Discard anything we may have already received + self.responses.pop(msgid) + msgids.remove(msgid) + else: + # The call was sent out, but no response yet; if we get one, we'll ignore it + self.ignore_responses.add(msgid) + def handle_remote_line(line): if line.startswith('$LOG '): diff --git a/src/borg/repository.py b/src/borg/repository.py index 095a9ba7f5..a98c60b1dd 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -805,6 +805,14 @@ def preload(self, ids): """Preload objects (only applies to remote repositories) """ + def discard_preload(self, discard_ids): + """ + Ignore/stop preloading objects (only applies to remote repositories). + + Be warned: trying to get_many() anything from discard_ids while specifying is_preloaded=True *will* + deadlock with remote repositories. + """ + class LoggedIO: diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index 50cb42c5c4..b2fa810e72 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -736,6 +736,32 @@ def test_extract_with_pattern(self): self.cmd("extract", self.repository_location + "::test", "fm:input/file1", "fm:*file33*", "input/file2") self.assert_equal(sorted(os.listdir("output/input")), ["file1", "file2", "file333"]) + def test_extract_continue(self): + self.cmd('init', self.repository_location) + self.create_regular_file('file1', size=1024 * 80) + self.create_regular_file('file2', size=1024 * 100) + self.cmd('create', self.repository_location + '::test', 'input') + + with changedir('output'): + self.cmd('extract', self.repository_location + '::test') + with open('input/file1', 'ab') as file: + file.truncate(1234) + with open('input/file2', 'ab') as file: + file.write(b'extra bytes') + self.cmd('extract', self.repository_location + '::test', '--continue') + self.assert_dirs_equal('input', 'output/input') + + shutil.rmtree('output/input') + + with changedir('output'): + os.mkdir('input') + with open('input/file1', 'ab') as file: + file.truncate(1234) + with open('input/file2', 'ab') as file: + file.write(b'extra bytes') + self.cmd('extract', self.repository_location + '::test') + self.assert_dirs_equal('input', 'output/input') + def test_extract_list_output(self): self.cmd('init', self.repository_location) self.create_regular_file('file', size=1024 * 80) diff --git a/src/borg/testsuite/helpers.py b/src/borg/testsuite/helpers.py index 55569e96e1..c9656c71ad 100644 --- a/src/borg/testsuite/helpers.py +++ b/src/borg/testsuite/helpers.py @@ -10,7 +10,8 @@ import msgpack.fallback from .. import platform -from ..helpers import Location +from ..cache import ChunkListEntry +from ..helpers import Location, slice_chunks from ..helpers import Buffer from ..helpers import partial_format, format_file_size, parse_file_size, format_timedelta, format_line, PlaceholderError, replace_placeholders from ..helpers import make_path_safe, clean_lines @@ -1081,3 +1082,29 @@ def test_swidth_slice_mixed_characters(): string = '나윤a선나윤선나윤선나윤선나윤선' assert swidth_slice(string, 5) == '나윤a' assert swidth_slice(string, 6) == '나윤a' + + +class TestSliceChunks: + @staticmethod + def chunk(size): + return ChunkListEntry(None, size, 0) + + @pytest.mark.parametrize('chunks, offset, expected_chunks, expected_prefix_length', ( + # Edge case: offset exactly on chunk boundary + ([(1, 1000), (2, 500)], 1000, [(2, 500)], 1000), + ([(1, 1000), (2, 500)], 999, [(1, 1000), (2, 500)], 0), + ([(1, 1000), (2, 500)], 1001, [(2, 500)], 1000), + + # Edge case: offset completely consumes + ([(1, 1000), (2, 500)], 1500, [], 1500), + ([(1, 1000), (2, 500)], 1499, [(2, 500)], 1000), + + # Edge case: offset > length of chunks + ([(1, 1000), (2, 500)], 1501, [], 1500), + )) + def test_basic(self, chunks, offset, expected_chunks, expected_prefix_length): + chunks = [ChunkListEntry(id, size, 0) for id, size in chunks] + expected_chunks = [ChunkListEntry(id, size, 0) for id, size in expected_chunks] + chunks, remaing_offset = slice_chunks(chunks, offset) + assert chunks == expected_chunks + assert remaing_offset == expected_prefix_length