From 7f08b1fbc9bf751f165eafb8162e3d6923eb275a Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 23 Apr 2022 02:27:08 +0200 Subject: [PATCH] export-tar --tar-format=BORG_C / import-tar: support chunked tar content while the BORG format uses a full, raw content byte stream, the BORG_C format uses a sequence of chunk packs. each pack is: - 32bit size (signed) - 256bit chunk id - bytes data (optional, only present if size != -1) for simplicity, a pack is generated for each entry in item.chunks, but only still missing chunks have data. packs with no data (size == -1) must already exist in the target repository. for simplicity / for now: - export-tar decrypts and decompresses, but chunks and chunk ids are kept - import-tar does not recompute chunk ids, accepts missing chunks "as is" (but recompresses / re-encrypts) and increfs already present chunks. - no preload via archive.iter_items for chunked mode - have_chunks is initialised to the empty set, thus only inner duplication in the exported archive is considered. --- src/borg/archive.py | 45 ++++++++++++++++++------ src/borg/archiver.py | 81 ++++++++++++++++++++++++++++++++++---------- src/borg/cache.py | 2 +- src/borg/item.pyx | 18 +++++++--- 4 files changed, 113 insertions(+), 33 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 6f8642fa0b..da791c3563 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -3,6 +3,7 @@ import os import socket import stat +import struct import sys import time from collections import OrderedDict @@ -1173,16 +1174,17 @@ def stat_attrs(self, st, path, fd=None): def cached_hash(chunk, id_hash): + chunk_id = chunk.meta.get('id') allocation = chunk.meta['allocation'] if allocation == CH_DATA: data = chunk.data - chunk_id = id_hash(data) + chunk_id = chunk_id or id_hash(data) elif allocation in (CH_HOLE, CH_ALLOC): size = chunk.meta['size'] assert size <= len(zeros) data = memoryview(zeros)[:size] try: - chunk_id = zero_chunk_ids[(id_hash, size)] + chunk_id = chunk_id or zero_chunk_ids[(id_hash, size)] except KeyError: chunk_id = id_hash(data) zero_chunk_ids[(id_hash, size)] = chunk_id @@ -1466,10 +1468,13 @@ def create_helper(self, tarinfo, status=None, type=None): ph = tarinfo.pax_headers if ph and 'BORG.item.version' in ph: assert ph['BORG.item.version'] == '1' + chunked = ph['BORG.item.data_format'] == 'chunks' meta_bin = base64.b64decode(ph['BORG.item.meta']) meta_dict = msgpack.unpackb(meta_bin, object_hook=StableDict) item = Item(internal_dict=meta_dict) else: + chunked = False + def s_to_ns(s): return safe_ns(int(float(s) * 1e9)) @@ -1483,35 +1488,55 @@ def s_to_ns(s): if name in ph: ns = s_to_ns(ph[name]) setattr(item, name, ns) - yield item, status + yield item, status, chunked # if we get here, "with"-block worked ok without error/exception, the item was processed ok... self.add_item(item, stats=self.stats) def process_dir(self, *, tarinfo, status, type): - with self.create_helper(tarinfo, status, type) as (item, status): + with self.create_helper(tarinfo, status, type) as (item, status, chunked): return status def process_fifo(self, *, tarinfo, status, type): - with self.create_helper(tarinfo, status, type) as (item, status): + with self.create_helper(tarinfo, status, type) as (item, status, chunked): return status def process_dev(self, *, tarinfo, status, type): - with self.create_helper(tarinfo, status, type) as (item, status): + with self.create_helper(tarinfo, status, type) as (item, status, chunked): item.rdev = os.makedev(tarinfo.devmajor, tarinfo.devminor) return status def process_link(self, *, tarinfo, status, type): - with self.create_helper(tarinfo, status, type) as (item, status): + with self.create_helper(tarinfo, status, type) as (item, status, chunked): item.source = tarinfo.linkname return status def process_file(self, *, tarinfo, status, type, tar): - with self.create_helper(tarinfo, status, type) as (item, status): + with self.create_helper(tarinfo, status, type) as (item, status, chunked): self.print_file_status(status, tarinfo.name) status = None # we already printed the status fd = tar.extractfile(tarinfo) - self.process_file_chunks(item, self.cache, self.stats, self.show_progress, - backup_io_iter(self.chunker.chunkify(fd))) + if not chunked: + chunk_iter = backup_io_iter(self.chunker.chunkify(fd)) + else: + def _chunk_iter(fd): + hdr_fmt = struct.Struct('= 0: + chunk = fd.read(size) + assert len(chunk) == size + else: + # chunk is not streamed, we have its chunk_id and it should be in the repo + chunk = None + size = None + yield Chunk(chunk, id=chunk_id, size=size, allocation=CH_DATA) + hdr = fd.read(hdr_length) + + chunk_iter = _chunk_iter(fd) + + self.process_file_chunks(item, self.cache, self.stats, self.show_progress, chunk_iter) item.get_size(memorize=True) self.stats.nfiles += 1 return status diff --git a/src/borg/archiver.py b/src/borg/archiver.py index e4cd8d0749..138317c1a4 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -12,6 +12,7 @@ import functools import hashlib import inspect + import io import itertools import json import logging @@ -21,6 +22,7 @@ import shutil import signal import stat + import struct import subprocess import tarfile import textwrap @@ -556,7 +558,6 @@ def do_benchmark_cpu(self, args): key_128 = os.urandom(16) key_96 = os.urandom(12) - import io from borg.chunker import get_chunker print("Chunkers =======================================================") size = "1GB" @@ -1146,7 +1147,8 @@ def peek_and_store_hardlink_masters(item, matched): # The | (pipe) symbol instructs tarfile to use a streaming mode of operation # where it never seeks on the passed fileobj. - tar_format = dict(GNU=tarfile.GNU_FORMAT, PAX=tarfile.PAX_FORMAT, BORG=tarfile.PAX_FORMAT)[args.tar_format] + tar_format = dict(GNU=tarfile.GNU_FORMAT, PAX=tarfile.PAX_FORMAT, + BORG=tarfile.PAX_FORMAT, BORG_C=tarfile.PAX_FORMAT)[args.tar_format] tar = tarfile.open(fileobj=tarstream, mode='w|', format=tar_format) if progress: @@ -1157,12 +1159,28 @@ def peek_and_store_hardlink_masters(item, matched): else: pi = None - def item_content_stream(item): + def item_content_stream(item, *, chunked, chunk_ids, missing_ids): """ Return a file-like object that reads from the chunks of *item*. """ - chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in item.chunks], - is_preloaded=True) + if not chunked: + # when chunked is False, we iterate over chunk_ids, this will iterate over all item chunks + chunk_iterator = archive.pipeline.fetch_many(chunk_ids, is_preloaded=True) + else: + def _pack_iterator(chunk_ids, missing_ids): + hdr_fmt = struct.Struct('