Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 96 additions & 3 deletions src/borg/hashindex.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ assert UINT32_MAX == 2**32-1
assert _MAX_VALUE % 2 == 1


def hashindex_variant(fn):
"""peek into an index file and find out what it is"""
with open(fn, 'rb') as f:
hh = f.read(18) # len(HashHeader)
magic = hh[0:8]
if magic == b'BORG_IDX':
key_size = hh[16]
value_size = hh[17]
return f'k{key_size}_v{value_size}'
if magic == b'12345678': # used by unit tests
return 'k32_v16' # just return the current variant
raise ValueError(f'unknown hashindex format, magic: {magic!r}')


@cython.internal
cdef class IndexBase:
cdef HashIndex *index
Expand Down Expand Up @@ -198,9 +212,12 @@ cdef class FuseVersionsIndex(IndexBase):
return hashindex_get(self.index, <unsigned char *>key) != NULL


NSIndexEntry = namedtuple('NSIndexEntry', 'segment offset size extra')


cdef class NSIndex(IndexBase):

value_size = 8
value_size = 16

def __getitem__(self, key):
assert len(key) == self.key_size
Expand All @@ -209,15 +226,17 @@ cdef class NSIndex(IndexBase):
raise KeyError(key)
cdef uint32_t segment = _le32toh(data[0])
assert segment <= _MAX_VALUE, "maximum number of segments reached"
return segment, _le32toh(data[1])
return NSIndexEntry(segment, _le32toh(data[1]), _le32toh(data[2]), _le32toh(data[3]))

def __setitem__(self, key, value):
assert len(key) == self.key_size
cdef uint32_t[2] data
cdef uint32_t[4] data
cdef uint32_t segment = value[0]
assert segment <= _MAX_VALUE, "maximum number of segments reached"
data[0] = _htole32(segment)
data[1] = _htole32(value[1])
data[2] = _htole32(value[2])
data[3] = _htole32(value[3])
if not hashindex_set(self.index, <unsigned char *>key, data):
raise Exception('hashindex_set failed')

Expand Down Expand Up @@ -250,6 +269,80 @@ cdef class NSKeyIterator:
cdef int key_size
cdef int exhausted

def __cinit__(self, key_size):
self.key = NULL
self.key_size = key_size
self.exhausted = 0

def __iter__(self):
return self

def __next__(self):
if self.exhausted:
raise StopIteration
self.key = hashindex_next_key(self.index, <unsigned char *>self.key)
if not self.key:
self.exhausted = 1
raise StopIteration
cdef uint32_t *value = <uint32_t *>(self.key + self.key_size)
cdef uint32_t segment = _le32toh(value[0])
assert segment <= _MAX_VALUE, "maximum number of segments reached"
return ((<char *>self.key)[:self.key_size],
NSIndexEntry(segment, _le32toh(value[1]), _le32toh(value[2]), _le32toh(value[3])))


cdef class NSIndex1(IndexBase): # legacy borg 1.x

value_size = 8

def __getitem__(self, key):
assert len(key) == self.key_size
data = <uint32_t *>hashindex_get(self.index, <unsigned char *>key)
if not data:
raise KeyError(key)
cdef uint32_t segment = _le32toh(data[0])
assert segment <= _MAX_VALUE, "maximum number of segments reached"
return segment, _le32toh(data[1])

def __setitem__(self, key, value):
assert len(key) == self.key_size
cdef uint32_t[2] data
cdef uint32_t segment = value[0]
assert segment <= _MAX_VALUE, "maximum number of segments reached"
data[0] = _htole32(segment)
data[1] = _htole32(value[1])
if not hashindex_set(self.index, <unsigned char *>key, data):
raise Exception('hashindex_set failed')

def __contains__(self, key):
cdef uint32_t segment
assert len(key) == self.key_size
data = <uint32_t *>hashindex_get(self.index, <unsigned char *>key)
if data != NULL:
segment = _le32toh(data[0])
assert segment <= _MAX_VALUE, "maximum number of segments reached"
return data != NULL

def iteritems(self, marker=None):
cdef const unsigned char *key
iter = NSKeyIterator1(self.key_size)
iter.idx = self
iter.index = self.index
if marker:
key = hashindex_get(self.index, <unsigned char *>marker)
if marker is None:
raise IndexError
iter.key = key - self.key_size
return iter


cdef class NSKeyIterator1: # legacy borg 1.x
cdef NSIndex1 idx
cdef HashIndex *index
cdef const unsigned char *key
cdef int key_size
cdef int exhausted

def __cinit__(self, key_size):
self.key = NULL
self.key_size = key_size
Expand Down
104 changes: 63 additions & 41 deletions src/borg/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from itertools import islice

from .constants import * # NOQA
from .hashindex import NSIndex
from .hashindex import NSIndexEntry, NSIndex, NSIndex1, hashindex_variant
from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size
from .helpers import Location
from .helpers import ProgressIndicatorPercent
Expand Down Expand Up @@ -52,6 +52,18 @@
FreeSpace = partial(defaultdict, int)


def header_size(tag):
if tag == TAG_PUT2:
size = LoggedIO.HEADER_ID_SIZE + LoggedIO.ENTRY_HASH_SIZE
elif tag == TAG_PUT or tag == TAG_DELETE:
size = LoggedIO.HEADER_ID_SIZE
elif tag == TAG_COMMIT:
size = LoggedIO.header_fmt.size
else:
raise ValueError(f"unsupported tag: {tag!r}")
return size


class Repository:
"""
Filesystem based transactional key value store
Expand Down Expand Up @@ -525,10 +537,14 @@ def open_index(self, transaction_id, auto_recover=True):
if transaction_id is None:
return NSIndex()
index_path = os.path.join(self.path, 'index.%d' % transaction_id)
variant = hashindex_variant(index_path)
integrity_data = self._read_integrity(transaction_id, b'index')
try:
with IntegrityCheckedFile(index_path, write=False, integrity_data=integrity_data) as fd:
return NSIndex.read(fd)
if variant == 'k32_v16':
return NSIndex.read(fd)
if variant == 'k32_v8': # legacy
return NSIndex1.read(fd)
except (ValueError, OSError, FileIntegrityError) as exc:
logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc)
os.unlink(index_path)
Expand Down Expand Up @@ -798,14 +814,14 @@ def complete_xfer(intermediate=True):
if tag == TAG_COMMIT:
continue
in_index = self.index.get(key)
is_index_object = in_index == (segment, offset)
is_index_object = in_index and (in_index.segment, in_index.offset) == (segment, offset)
if tag in (TAG_PUT2, TAG_PUT) and is_index_object:
try:
new_segment, offset = self.io.write_put(key, data, raise_full=True)
except LoggedIO.SegmentFull:
complete_xfer()
new_segment, offset = self.io.write_put(key, data)
self.index[key] = new_segment, offset
self.index[key] = NSIndexEntry(new_segment, offset, len(data), in_index.extra)
segments.setdefault(new_segment, 0)
segments[new_segment] += 1
segments[segment] -= 1
Expand All @@ -821,10 +837,7 @@ def complete_xfer(intermediate=True):
# do not remove entry with empty shadowed_segments list here,
# it is needed for shadowed_put_exists code (see below)!
pass
if tag == TAG_PUT2:
self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE
elif tag == TAG_PUT:
self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE
self.storage_quota_use -= header_size(tag) + len(data)
elif tag == TAG_DELETE and not in_index:
# If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
# therefore we do not drop the delete, but write it to a current segment.
Expand Down Expand Up @@ -919,27 +932,26 @@ def _update_index(self, segment, objects, report=None):
if tag in (TAG_PUT2, TAG_PUT):
try:
# If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
s, _ = self.index[key]
self.compact[s] += size
self.segments[s] -= 1
in_index = self.index[key]
self.compact[in_index.segment] += header_size(tag) + size
self.segments[in_index.segment] -= 1
except KeyError:
pass
self.index[key] = segment, offset
self.index[key] = NSIndexEntry(segment, offset, size, 0)
self.segments[segment] += 1
self.storage_quota_use += size # note: size already includes the put_header_fmt overhead
self.storage_quota_use += header_size(tag) + size
elif tag == TAG_DELETE:
try:
# if the deleted PUT is not in the index, there is nothing to clean up
s, offset = self.index.pop(key)
in_index = self.index.pop(key)
except KeyError:
pass
else:
if self.io.segment_exists(s):
if self.io.segment_exists(in_index.segment):
# the old index is not necessarily valid for this transaction (e.g. compaction); if the segment
# is already gone, then it was already compacted.
self.segments[s] -= 1
size = self.io.read(s, offset, key, read_data=False)
self.compact[s] += size
self.segments[in_index.segment] -= 1
self.compact[in_index.segment] += header_size(tag) + in_index.size
elif tag == TAG_COMMIT:
continue
else:
Expand Down Expand Up @@ -968,12 +980,13 @@ def _rebuild_sparse(self, segment):
self.compact[segment] = 0
for tag, key, offset, size in self.io.iter_objects(segment, read_data=False):
if tag in (TAG_PUT2, TAG_PUT):
if self.index.get(key, (-1, -1)) != (segment, offset):
in_index = self.index.get(key)
if not in_index or (in_index.segment, in_index.offset) != (segment, offset):
# This PUT is superseded later
self.compact[segment] += size
self.compact[segment] += header_size(tag) + size
elif tag == TAG_DELETE:
# The outcome of the DELETE has been recorded in the PUT branch already
self.compact[segment] += size
self.compact[segment] += header_size(tag) + size

def check(self, repair=False, save_space=False, max_duration=0):
"""Check repository consistency
Expand Down Expand Up @@ -1169,7 +1182,7 @@ def scan(self, limit=None, marker=None):
self.index = self.open_index(transaction_id)
at_start = marker is None
# smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
start_segment, start_offset = (0, 0) if at_start else self.index[marker]
start_segment, start_offset, _, _ = (0, 0, 0, 0) if at_start else self.index[marker]
result = []
for segment, filename in self.io.segment_iterator(start_segment):
obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False, include_data=False)
Expand All @@ -1186,19 +1199,21 @@ def scan(self, limit=None, marker=None):
# also, for the next segment, we need to start at offset 0.
start_offset = 0
continue
if tag in (TAG_PUT2, TAG_PUT) and (segment, offset) == self.index.get(id):
# we have found an existing and current object
result.append(id)
if len(result) == limit:
return result
if tag in (TAG_PUT2, TAG_PUT):
in_index = self.index.get(id)
if in_index and (in_index.segment, in_index.offset) == (segment, offset):
# we have found an existing and current object
result.append(id)
if len(result) == limit:
return result
return result

def get(self, id):
if not self.index:
self.index = self.open_index(self.get_transaction_id())
try:
segment, offset = self.index[id]
return self.io.read(segment, offset, id)
in_index = NSIndexEntry(*((self.index[id] + (None, None))[:4])) # legacy: no size/extra
return self.io.read(in_index.segment, in_index.offset, id, expected_size=in_index.size)
except KeyError:
raise self.ObjectNotFound(id, self.path) from None

Expand All @@ -1215,20 +1230,20 @@ def put(self, id, data, wait=True):
if not self._active_txn:
self.prepare_txn(self.get_transaction_id())
try:
segment, offset = self.index[id]
in_index = self.index[id]
except KeyError:
pass
else:
# note: doing a delete first will do some bookkeeping.
# we do not want to update the shadow_index here, because
# we know already that we will PUT to this id, so it will
# be in the repo index (and we won't need it in the shadow_index).
self._delete(id, segment, offset, update_shadow_index=False)
self._delete(id, in_index.segment, in_index.offset, in_index.size, update_shadow_index=False)
segment, offset = self.io.write_put(id, data)
self.storage_quota_use += len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE
self.storage_quota_use += header_size(TAG_PUT2) + len(data)
self.segments.setdefault(segment, 0)
self.segments[segment] += 1
self.index[id] = segment, offset
self.index[id] = NSIndexEntry(segment, offset, len(data), 0)
if self.storage_quota and self.storage_quota_use > self.storage_quota:
self.transaction_doomed = self.StorageQuotaExceeded(
format_file_size(self.storage_quota), format_file_size(self.storage_quota_use))
Expand All @@ -1243,22 +1258,21 @@ def delete(self, id, wait=True):
if not self._active_txn:
self.prepare_txn(self.get_transaction_id())
try:
segment, offset = self.index.pop(id)
in_index = self.index.pop(id)
except KeyError:
raise self.ObjectNotFound(id, self.path) from None
# if we get here, there is an object with this id in the repo,
# we write a DEL here that shadows the respective PUT.
# after the delete, the object is not in the repo index any more,
# for the compaction code, we need to update the shadow_index in this case.
self._delete(id, segment, offset, update_shadow_index=True)
self._delete(id, in_index.segment, in_index.offset, in_index.size, update_shadow_index=True)

def _delete(self, id, segment, offset, *, update_shadow_index):
def _delete(self, id, segment, offset, size, *, update_shadow_index):
# common code used by put and delete
if update_shadow_index:
self.shadow_index.setdefault(id, []).append(segment)
self.segments[segment] -= 1
size = self.io.read(segment, offset, id, read_data=False)
self.compact[segment] += size
self.compact[segment] += size + header_size(TAG_PUT2)
segment, size = self.io.write_delete(id)
self.compact[segment] += size
self.segments.setdefault(segment, 0)
Expand Down Expand Up @@ -1448,6 +1462,9 @@ def clean_old():
del self.fds[k]

clean_old()
if self._write_fd is not None:
# without this, we have test failure now
self._write_fd.sync()
Comment on lines +1465 to +1467
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was needed now to avoid a test failure.
otherwise it could not read the MAGIC back from a recently created segment file.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done a bit more than necessary, sufficient would be:

self._write_fd.f.flush()

But this is not available yet via SyncFile api.

try:
ts, fd = self.fds[segment]
except KeyError:
Expand Down Expand Up @@ -1515,7 +1532,8 @@ def iter_objects(self, segment, offset=0, include_data=False, read_data=True):
if include_data:
yield tag, key, offset, data
else:
yield tag, key, offset, size
yield tag, key, offset, size - header_size(tag) # corresponds to len(data)
assert size >= 0
Comment on lines 1532 to +1536
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i changed this so that it either returns the data (so caller could do len(data)) or it does not return the data, but precisely len(data) without any header overheads.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if callers now want the size with overhead, they add header_size(tag) again.

offset += size
# we must get the fd via get_fd() here again as we yielded to our caller and it might
# have triggered closing of the fd we had before (e.g. by calling io.read() for
Expand Down Expand Up @@ -1580,7 +1598,7 @@ def entry_hash(self, *data):
h.update(d)
return h.digest()

def read(self, segment, offset, id, read_data=True):
def read(self, segment, offset, id, read_data=True, *, expected_size=None):
"""
Read entry from *segment* at *offset* with *id*.
If read_data is False the size of the entry is returned instead.
Expand All @@ -1596,7 +1614,11 @@ def read(self, segment, offset, id, read_data=True):
if id != key:
raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
segment, offset))
return data if read_data else size
data_size_from_header = size - header_size(tag)
if expected_size is not None and expected_size != data_size_from_header:
raise IntegrityError(f'size from repository index: {expected_size} != '
f'size from entry header: {data_size_from_header}')
return data if read_data else data_size_from_header

def _read(self, fd, header, segment, offset, acceptable_tags, read_data=True):
"""
Expand Down
Loading