Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import socket
import stat
import struct
import sys
import time
from collections import OrderedDict
Expand Down Expand Up @@ -1173,16 +1174,17 @@ def stat_attrs(self, st, path, fd=None):


def cached_hash(chunk, id_hash):
chunk_id = chunk.meta.get('id')
allocation = chunk.meta['allocation']
if allocation == CH_DATA:
data = chunk.data
chunk_id = id_hash(data)
chunk_id = chunk_id or id_hash(data)
elif allocation in (CH_HOLE, CH_ALLOC):
size = chunk.meta['size']
assert size <= len(zeros)
data = memoryview(zeros)[:size]
try:
chunk_id = zero_chunk_ids[(id_hash, size)]
chunk_id = chunk_id or zero_chunk_ids[(id_hash, size)]
except KeyError:
chunk_id = id_hash(data)
zero_chunk_ids[(id_hash, size)] = chunk_id
Expand Down Expand Up @@ -1466,10 +1468,13 @@ def create_helper(self, tarinfo, status=None, type=None):
ph = tarinfo.pax_headers
if ph and 'BORG.item.version' in ph:
assert ph['BORG.item.version'] == '1'
chunked = ph['BORG.item.data_format'] == 'chunks'
meta_bin = base64.b64decode(ph['BORG.item.meta'])
meta_dict = msgpack.unpackb(meta_bin, object_hook=StableDict)
item = Item(internal_dict=meta_dict)
else:
chunked = False

def s_to_ns(s):
return safe_ns(int(float(s) * 1e9))

Expand All @@ -1483,35 +1488,55 @@ def s_to_ns(s):
if name in ph:
ns = s_to_ns(ph[name])
setattr(item, name, ns)
yield item, status
yield item, status, chunked
# if we get here, "with"-block worked ok without error/exception, the item was processed ok...
self.add_item(item, stats=self.stats)

def process_dir(self, *, tarinfo, status, type):
with self.create_helper(tarinfo, status, type) as (item, status):
with self.create_helper(tarinfo, status, type) as (item, status, chunked):
return status

def process_fifo(self, *, tarinfo, status, type):
with self.create_helper(tarinfo, status, type) as (item, status):
with self.create_helper(tarinfo, status, type) as (item, status, chunked):
return status

def process_dev(self, *, tarinfo, status, type):
with self.create_helper(tarinfo, status, type) as (item, status):
with self.create_helper(tarinfo, status, type) as (item, status, chunked):
item.rdev = os.makedev(tarinfo.devmajor, tarinfo.devminor)
return status

def process_link(self, *, tarinfo, status, type):
with self.create_helper(tarinfo, status, type) as (item, status):
with self.create_helper(tarinfo, status, type) as (item, status, chunked):
item.source = tarinfo.linkname
return status

def process_file(self, *, tarinfo, status, type, tar):
with self.create_helper(tarinfo, status, type) as (item, status):
with self.create_helper(tarinfo, status, type) as (item, status, chunked):
self.print_file_status(status, tarinfo.name)
status = None # we already printed the status
fd = tar.extractfile(tarinfo)
self.process_file_chunks(item, self.cache, self.stats, self.show_progress,
backup_io_iter(self.chunker.chunkify(fd)))
if not chunked:
chunk_iter = backup_io_iter(self.chunker.chunkify(fd))
else:
def _chunk_iter(fd):
hdr_fmt = struct.Struct('<i32s')
hdr_length = 4 + 32
hdr = fd.read(hdr_length)
while len(hdr) == hdr_length:
size, chunk_id = hdr_fmt.unpack(hdr)
if size >= 0:
chunk = fd.read(size)
assert len(chunk) == size
else:
# chunk is not streamed, we have its chunk_id and it should be in the repo
chunk = None
size = None
yield Chunk(chunk, id=chunk_id, size=size, allocation=CH_DATA)
hdr = fd.read(hdr_length)

chunk_iter = _chunk_iter(fd)

self.process_file_chunks(item, self.cache, self.stats, self.show_progress, chunk_iter)
item.get_size(memorize=True)
self.stats.nfiles += 1
return status
Expand Down
81 changes: 63 additions & 18 deletions src/borg/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import functools
import hashlib
import inspect
import io
import itertools
import json
import logging
Expand All @@ -21,6 +22,7 @@
import shutil
import signal
import stat
import struct
import subprocess
import tarfile
import textwrap
Expand Down Expand Up @@ -556,7 +558,6 @@ def do_benchmark_cpu(self, args):
key_128 = os.urandom(16)
key_96 = os.urandom(12)

import io
from borg.chunker import get_chunker
print("Chunkers =======================================================")
size = "1GB"
Expand Down Expand Up @@ -1146,7 +1147,8 @@ def peek_and_store_hardlink_masters(item, matched):

# The | (pipe) symbol instructs tarfile to use a streaming mode of operation
# where it never seeks on the passed fileobj.
tar_format = dict(GNU=tarfile.GNU_FORMAT, PAX=tarfile.PAX_FORMAT, BORG=tarfile.PAX_FORMAT)[args.tar_format]
tar_format = dict(GNU=tarfile.GNU_FORMAT, PAX=tarfile.PAX_FORMAT,
BORG=tarfile.PAX_FORMAT, BORG_C=tarfile.PAX_FORMAT)[args.tar_format]
tar = tarfile.open(fileobj=tarstream, mode='w|', format=tar_format)

if progress:
Expand All @@ -1157,26 +1159,46 @@ def peek_and_store_hardlink_masters(item, matched):
else:
pi = None

def item_content_stream(item):
def item_content_stream(item, *, chunked, chunk_ids, missing_ids):
"""
Return a file-like object that reads from the chunks of *item*.
"""
chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in item.chunks],
is_preloaded=True)
if not chunked:
# when chunked is False, we iterate over chunk_ids, this will iterate over all item chunks
chunk_iterator = archive.pipeline.fetch_many(chunk_ids, is_preloaded=True)
else:
def _pack_iterator(chunk_ids, missing_ids):
hdr_fmt = struct.Struct('<i32s') # int32 data_length + 256bit chunk_id + optional data
for chunk_id in chunk_ids:
if chunk_id in missing_ids:
cdata = archive.repository.get(chunk_id)
chunk = archive.key.decrypt(chunk_id, cdata)
yield hdr_fmt.pack(len(chunk), chunk_id) + chunk
missing_ids.remove(chunk_id)
else:
yield hdr_fmt.pack(-1, chunk_id)

# iterate over all item chunks, but include data in packs only for missing chunks
chunk_iterator = _pack_iterator(chunk_ids, missing_ids)

if pi:
info = [remove_surrogates(item.path)]
return ChunkIteratorFileWrapper(chunk_iterator,
lambda read_bytes: pi.show(increase=len(read_bytes), info=info))
else:
return ChunkIteratorFileWrapper(chunk_iterator)

def item_to_tarinfo(item, original_path):
def item_to_tarinfo(item, original_path, *, chunked, have_chunks):
"""
Transform a Borg *item* into a tarfile.TarInfo object.

Return a tuple (tarinfo, stream), where stream may be a file-like object that represents
the file contents, if any, and is None otherwise. When *tarinfo* is None, the *item*
cannot be represented as a TarInfo object and should be skipped.
Return a tuple (tarinfo, stream).
When *tarinfo* is None, the *item* cannot be represented as a TarInfo object and should be skipped.
stream may be a file-like object that somehow represents the file contents, if any, and is None otherwise.

chunked == False: stream is the plain file contents.

chunked == True: stream is a sequence of chunk packs.
"""
stream = None
tarinfo = tarfile.TarInfo()
Expand All @@ -1195,6 +1217,21 @@ def item_to_tarinfo(item, original_path):
# whether implementations actually support that is a whole different question...
tarinfo.linkname = ""

def mk_stream(item, *, chunked, have_chunks):
chunk_ids = [chunk_id for chunk_id, _, _ in item.chunks]
if chunked:
missing_ids = set(chunk_ids) - have_chunks
# record format: int32 data_length + 256bit chunk_id + optional data
# we send record headers for ALL chunks. but data only for missing chunks
size = 36 * len(chunk_ids) + item.get_size(consider_once_ids=missing_ids)
else:
missing_ids = set(chunk_ids)
size = item.get_size() # just plain content data
stream = item_content_stream(item, chunked=chunked, chunk_ids=chunk_ids, missing_ids=missing_ids)
if chunked:
have_chunks.update(missing_ids) # now we have what was missing
return size, stream

modebits = stat.S_IFMT(item.mode)
if modebits == stat.S_IFREG:
tarinfo.type = tarfile.REGTYPE
Expand All @@ -1212,12 +1249,12 @@ def item_to_tarinfo(item, original_path):
# The item which has the chunks was not put into the tar, therefore
# we do that now and update hardlink_masters to reflect that.
item.chunks = chunks
tarinfo.size = item.get_size()
stream = item_content_stream(item)
size, stream = mk_stream(item, chunked=chunked, have_chunks=have_chunks)
tarinfo.size = size
hardlink_masters[item.get('source') or original_path] = (None, item.path)
else:
tarinfo.size = item.get_size()
stream = item_content_stream(item)
size, stream = mk_stream(item, chunked=chunked, have_chunks=have_chunks)
tarinfo.size = size
elif modebits == stat.S_IFDIR:
tarinfo.type = tarfile.DIRTYPE
elif modebits == stat.S_IFLNK:
Expand Down Expand Up @@ -1257,29 +1294,37 @@ def item_to_paxheaders(format, item):
# Additionally to the standard tar / PAX metadata and data, it transfers
# ALL borg item metadata in a BORG specific way.
#
# BORG_C format
# -------------
# Same as BORG, but transmits (missing) chunks instead of full raw content data.
#
ph = {}
# note: for mtime this is a bit redundant as it is already done by tarfile module,
# but we just do it in our way to be consistent for sure.
for name in 'atime', 'ctime', 'mtime':
if hasattr(item, name):
ns = getattr(item, name)
ph[name] = str(ns / 1e9)
if format == 'BORG': # BORG format additions
if format in ('BORG', 'BORG_C'): # BORG format additions
ph['BORG.item.version'] = '1'
# BORG.item.meta - just serialize all metadata we have:
meta_bin = msgpack.packb(item.as_dict())
meta_text = base64.b64encode(meta_bin).decode()
ph['BORG.item.meta'] = meta_text
ph['BORG.item.data_format'] = 'raw' if format == 'BORG' else 'chunks' # BORG_C
return ph

have_chunks = set() # we know we already have these chunks at the target destination
chunked = args.tar_format == 'BORG_C'

for item in archive.iter_items(filter, partial_extract=partial_extract,
preload=True, hardlink_masters=hardlink_masters):
preload=not chunked, hardlink_masters=hardlink_masters):
orig_path = item.path
if strip_components:
item.path = os.sep.join(orig_path.split(os.sep)[strip_components:])
tarinfo, stream = item_to_tarinfo(item, orig_path)
tarinfo, stream = item_to_tarinfo(item, orig_path, chunked=chunked, have_chunks=have_chunks)
if tarinfo:
if args.tar_format in ('BORG', 'PAX'):
if args.tar_format in ('BORG', 'BORG_C', 'PAX'):
tarinfo.pax_headers = item_to_paxheaders(args.tar_format, item)
if output_list:
logging.getLogger('borg.output.list').info(remove_surrogates(orig_path))
Expand Down Expand Up @@ -4120,7 +4165,7 @@ def define_borg_mount(parser):
subparser.add_argument('--list', dest='output_list', action='store_true',
help='output verbose list of items (files, dirs, ...)')
subparser.add_argument('--tar-format', metavar='FMT', dest='tar_format', default='GNU',
choices=('BORG', 'PAX', 'GNU'),
choices=('BORG', 'BORG_C', 'PAX', 'GNU'),
help='select tar format: BORG, PAX or GNU')
subparser.add_argument('location', metavar='ARCHIVE',
type=location_validator(archive=True),
Expand Down
2 changes: 1 addition & 1 deletion src/borg/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ def update_compatibility(self):
def add_chunk(self, id, chunk, stats, overwrite=False, wait=True):
if not self.txn_active:
self.begin_txn()
size = len(chunk)
size = len(chunk) if chunk is not None else None
refcount = self.seen_chunk(id, size)
if refcount and not overwrite:
return self.chunk_incref(id, stats)
Expand Down
18 changes: 14 additions & 4 deletions src/borg/item.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ class Item(PropDict):

part = PropDict._make_property('part', int)

def get_size(self, hardlink_masters=None, memorize=False, compressed=False, from_chunks=False, consider_ids=None):
def get_size(self, hardlink_masters=None, memorize=False, compressed=False, from_chunks=False,
consider_ids=None, consider_once_ids=None):
"""
Determine the (uncompressed or compressed) size of this item.

Expand All @@ -203,12 +204,14 @@ class Item(PropDict):
:param compressed: Whether the compressed or uncompressed size will be returned.
:param from_chunks: If true, size is computed from chunks even if a precomputed value is available.
:param consider_ids: Returns the size of the given ids only.
:param consider_ids: Returns the size of the given ids only and only considers each chunkid once.
"""
attr = 'csize' if compressed else 'size'
assert not (compressed and memorize), 'Item does not have a csize field.'
assert not (consider_ids is not None and memorize), "Can't store size when considering only certain ids"
assert not ((consider_ids is not None or consider_once_ids is not None)
and memorize), "Can't store size when considering only certain ids"
try:
if from_chunks or consider_ids is not None:
if from_chunks or consider_ids is not None or consider_once_ids is not None:
raise AttributeError
size = getattr(self, attr)
except AttributeError:
Expand Down Expand Up @@ -238,7 +241,14 @@ class Item(PropDict):
chunks, _ = hardlink_masters.get(master, (None, None))
if chunks is None:
return 0
if consider_ids is not None:
if consider_once_ids is not None:
size = 0
consider_once_ids = set(consider_once_ids)
for chunk in chunks:
if chunk.id in consider_once_ids:
size += getattr(ChunkListEntry(*chunk), attr)
consider_once_ids.remove(chunk.id)
elif consider_ids is not None:
size = sum(getattr(ChunkListEntry(*chunk), attr) for chunk in chunks if chunk.id in consider_ids)
else:
size = sum(getattr(ChunkListEntry(*chunk), attr) for chunk in chunks)
Expand Down