From eee2c75e67dffc376cd604ae3ac9eeb29b9ef0f4 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 12 May 2022 01:45:50 +0200 Subject: [PATCH 1/2] repository index: add payload size (==csize) and flags to NSIndex entries This saves some segment file random IO that was previously necessary just to determine the size of to be deleted data. Keep old one as NSIndex1 for old borg compatibility. Choose NSIndex or NSIndex1 based on repo index layout from HashHeader. for an old repo index repo.get(key) returns segment, offset, None, None --- src/borg/hashindex.pyx | 99 +++++++++++++++++++++++++++++- src/borg/repository.py | 101 ++++++++++++++++++------------- src/borg/testsuite/hashindex.py | 48 +++++++-------- src/borg/testsuite/repository.py | 2 +- 4 files changed, 181 insertions(+), 69 deletions(-) diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 0e0ecc913c..6b2c5eb8b6 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -79,6 +79,20 @@ assert UINT32_MAX == 2**32-1 assert _MAX_VALUE % 2 == 1 +def hashindex_variant(fn): + """peek into an index file and find out what it is""" + with open(fn, 'rb') as f: + hh = f.read(18) # len(HashHeader) + magic = hh[0:8] + if magic == b'BORG_IDX': + key_size = hh[16] + value_size = hh[17] + return f'k{key_size}_v{value_size}' + if magic == b'12345678': # used by unit tests + return 'k32_v16' # just return the current variant + raise ValueError(f'unknown hashindex format, magic: {magic!r}') + + @cython.internal cdef class IndexBase: cdef HashIndex *index @@ -198,9 +212,12 @@ cdef class FuseVersionsIndex(IndexBase): return hashindex_get(self.index, key) != NULL +NSIndexEntry = namedtuple('NSIndexEntry', 'segment offset size extra') + + cdef class NSIndex(IndexBase): - value_size = 8 + value_size = 16 def __getitem__(self, key): assert len(key) == self.key_size @@ -209,15 +226,17 @@ cdef class NSIndex(IndexBase): raise KeyError(key) cdef uint32_t segment = _le32toh(data[0]) assert segment <= _MAX_VALUE, "maximum number of segments reached" - return segment, _le32toh(data[1]) + return NSIndexEntry(segment, _le32toh(data[1]), _le32toh(data[2]), _le32toh(data[3])) def __setitem__(self, key, value): assert len(key) == self.key_size - cdef uint32_t[2] data + cdef uint32_t[4] data cdef uint32_t segment = value[0] assert segment <= _MAX_VALUE, "maximum number of segments reached" data[0] = _htole32(segment) data[1] = _htole32(value[1]) + data[2] = _htole32(value[2]) + data[3] = _htole32(value[3]) if not hashindex_set(self.index, key, data): raise Exception('hashindex_set failed') @@ -250,6 +269,80 @@ cdef class NSKeyIterator: cdef int key_size cdef int exhausted + def __cinit__(self, key_size): + self.key = NULL + self.key_size = key_size + self.exhausted = 0 + + def __iter__(self): + return self + + def __next__(self): + if self.exhausted: + raise StopIteration + self.key = hashindex_next_key(self.index, self.key) + if not self.key: + self.exhausted = 1 + raise StopIteration + cdef uint32_t *value = (self.key + self.key_size) + cdef uint32_t segment = _le32toh(value[0]) + assert segment <= _MAX_VALUE, "maximum number of segments reached" + return ((self.key)[:self.key_size], + NSIndexEntry(segment, _le32toh(value[1]), _le32toh(value[2]), _le32toh(value[3]))) + + +cdef class NSIndex1(IndexBase): # legacy borg 1.x + + value_size = 8 + + def __getitem__(self, key): + assert len(key) == self.key_size + data = hashindex_get(self.index, key) + if not data: + raise KeyError(key) + cdef uint32_t segment = _le32toh(data[0]) + assert segment <= _MAX_VALUE, "maximum number of segments reached" + return segment, _le32toh(data[1]) + + def __setitem__(self, key, value): + assert len(key) == self.key_size + cdef uint32_t[2] data + cdef uint32_t segment = value[0] + assert segment <= _MAX_VALUE, "maximum number of segments reached" + data[0] = _htole32(segment) + data[1] = _htole32(value[1]) + if not hashindex_set(self.index, key, data): + raise Exception('hashindex_set failed') + + def __contains__(self, key): + cdef uint32_t segment + assert len(key) == self.key_size + data = hashindex_get(self.index, key) + if data != NULL: + segment = _le32toh(data[0]) + assert segment <= _MAX_VALUE, "maximum number of segments reached" + return data != NULL + + def iteritems(self, marker=None): + cdef const unsigned char *key + iter = NSKeyIterator1(self.key_size) + iter.idx = self + iter.index = self.index + if marker: + key = hashindex_get(self.index, marker) + if marker is None: + raise IndexError + iter.key = key - self.key_size + return iter + + +cdef class NSKeyIterator1: # legacy borg 1.x + cdef NSIndex1 idx + cdef HashIndex *index + cdef const unsigned char *key + cdef int key_size + cdef int exhausted + def __cinit__(self, key_size): self.key = NULL self.key_size = key_size diff --git a/src/borg/repository.py b/src/borg/repository.py index 9267fe0e6c..1d56cd0a49 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -13,7 +13,7 @@ from itertools import islice from .constants import * # NOQA -from .hashindex import NSIndex +from .hashindex import NSIndexEntry, NSIndex, NSIndex1, hashindex_variant from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size from .helpers import Location from .helpers import ProgressIndicatorPercent @@ -52,6 +52,18 @@ FreeSpace = partial(defaultdict, int) +def header_size(tag): + if tag == TAG_PUT2: + size = LoggedIO.HEADER_ID_SIZE + LoggedIO.ENTRY_HASH_SIZE + elif tag == TAG_PUT or tag == TAG_DELETE: + size = LoggedIO.HEADER_ID_SIZE + elif tag == TAG_COMMIT: + size = LoggedIO.header_fmt.size + else: + raise ValueError(f"unsupported tag: {tag!r}") + return size + + class Repository: """ Filesystem based transactional key value store @@ -525,10 +537,14 @@ def open_index(self, transaction_id, auto_recover=True): if transaction_id is None: return NSIndex() index_path = os.path.join(self.path, 'index.%d' % transaction_id) + variant = hashindex_variant(index_path) integrity_data = self._read_integrity(transaction_id, b'index') try: with IntegrityCheckedFile(index_path, write=False, integrity_data=integrity_data) as fd: - return NSIndex.read(fd) + if variant == 'k32_v16': + return NSIndex.read(fd) + if variant == 'k32_v8': # legacy + return NSIndex1.read(fd) except (ValueError, OSError, FileIntegrityError) as exc: logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc) os.unlink(index_path) @@ -798,14 +814,14 @@ def complete_xfer(intermediate=True): if tag == TAG_COMMIT: continue in_index = self.index.get(key) - is_index_object = in_index == (segment, offset) + is_index_object = in_index and (in_index.segment, in_index.offset) == (segment, offset) if tag in (TAG_PUT2, TAG_PUT) and is_index_object: try: new_segment, offset = self.io.write_put(key, data, raise_full=True) except LoggedIO.SegmentFull: complete_xfer() new_segment, offset = self.io.write_put(key, data) - self.index[key] = new_segment, offset + self.index[key] = NSIndexEntry(new_segment, offset, len(data), in_index.extra) segments.setdefault(new_segment, 0) segments[new_segment] += 1 segments[segment] -= 1 @@ -821,10 +837,7 @@ def complete_xfer(intermediate=True): # do not remove entry with empty shadowed_segments list here, # it is needed for shadowed_put_exists code (see below)! pass - if tag == TAG_PUT2: - self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE - elif tag == TAG_PUT: - self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE + self.storage_quota_use -= header_size(tag) + len(data) elif tag == TAG_DELETE and not in_index: # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag, # therefore we do not drop the delete, but write it to a current segment. @@ -919,27 +932,26 @@ def _update_index(self, segment, objects, report=None): if tag in (TAG_PUT2, TAG_PUT): try: # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space - s, _ = self.index[key] - self.compact[s] += size - self.segments[s] -= 1 + in_index = self.index[key] + self.compact[in_index.segment] += header_size(tag) + size + self.segments[in_index.segment] -= 1 except KeyError: pass - self.index[key] = segment, offset + self.index[key] = NSIndexEntry(segment, offset, size, 0) self.segments[segment] += 1 - self.storage_quota_use += size # note: size already includes the put_header_fmt overhead + self.storage_quota_use += header_size(tag) + size elif tag == TAG_DELETE: try: # if the deleted PUT is not in the index, there is nothing to clean up - s, offset = self.index.pop(key) + in_index = self.index.pop(key) except KeyError: pass else: - if self.io.segment_exists(s): + if self.io.segment_exists(in_index.segment): # the old index is not necessarily valid for this transaction (e.g. compaction); if the segment # is already gone, then it was already compacted. - self.segments[s] -= 1 - size = self.io.read(s, offset, key, read_data=False) - self.compact[s] += size + self.segments[in_index.segment] -= 1 + self.compact[in_index.segment] += header_size(tag) + in_index.size elif tag == TAG_COMMIT: continue else: @@ -968,12 +980,13 @@ def _rebuild_sparse(self, segment): self.compact[segment] = 0 for tag, key, offset, size in self.io.iter_objects(segment, read_data=False): if tag in (TAG_PUT2, TAG_PUT): - if self.index.get(key, (-1, -1)) != (segment, offset): + in_index = self.index.get(key) + if not in_index or (in_index.segment, in_index.offset) != (segment, offset): # This PUT is superseded later - self.compact[segment] += size + self.compact[segment] += header_size(tag) + size elif tag == TAG_DELETE: # The outcome of the DELETE has been recorded in the PUT branch already - self.compact[segment] += size + self.compact[segment] += header_size(tag) + size def check(self, repair=False, save_space=False, max_duration=0): """Check repository consistency @@ -1169,7 +1182,7 @@ def scan(self, limit=None, marker=None): self.index = self.open_index(transaction_id) at_start = marker is None # smallest valid seg is 0, smallest valid offs is 8 - start_segment, start_offset = (0, 0) if at_start else self.index[marker] + start_segment, start_offset, _, _ = (0, 0, 0, 0) if at_start else self.index[marker] result = [] for segment, filename in self.io.segment_iterator(start_segment): obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False, include_data=False) @@ -1186,19 +1199,21 @@ def scan(self, limit=None, marker=None): # also, for the next segment, we need to start at offset 0. start_offset = 0 continue - if tag in (TAG_PUT2, TAG_PUT) and (segment, offset) == self.index.get(id): - # we have found an existing and current object - result.append(id) - if len(result) == limit: - return result + if tag in (TAG_PUT2, TAG_PUT): + in_index = self.index.get(id) + if in_index and (in_index.segment, in_index.offset) == (segment, offset): + # we have found an existing and current object + result.append(id) + if len(result) == limit: + return result return result def get(self, id): if not self.index: self.index = self.open_index(self.get_transaction_id()) try: - segment, offset = self.index[id] - return self.io.read(segment, offset, id) + in_index = NSIndexEntry(*((self.index[id] + (None, None))[:4])) # legacy: no size/extra + return self.io.read(in_index.segment, in_index.offset, id, expected_size=in_index.size) except KeyError: raise self.ObjectNotFound(id, self.path) from None @@ -1215,7 +1230,7 @@ def put(self, id, data, wait=True): if not self._active_txn: self.prepare_txn(self.get_transaction_id()) try: - segment, offset = self.index[id] + in_index = self.index[id] except KeyError: pass else: @@ -1223,12 +1238,12 @@ def put(self, id, data, wait=True): # we do not want to update the shadow_index here, because # we know already that we will PUT to this id, so it will # be in the repo index (and we won't need it in the shadow_index). - self._delete(id, segment, offset, update_shadow_index=False) + self._delete(id, in_index.segment, in_index.offset, in_index.size, update_shadow_index=False) segment, offset = self.io.write_put(id, data) - self.storage_quota_use += len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE + self.storage_quota_use += header_size(TAG_PUT2) + len(data) self.segments.setdefault(segment, 0) self.segments[segment] += 1 - self.index[id] = segment, offset + self.index[id] = NSIndexEntry(segment, offset, len(data), 0) if self.storage_quota and self.storage_quota_use > self.storage_quota: self.transaction_doomed = self.StorageQuotaExceeded( format_file_size(self.storage_quota), format_file_size(self.storage_quota_use)) @@ -1243,22 +1258,21 @@ def delete(self, id, wait=True): if not self._active_txn: self.prepare_txn(self.get_transaction_id()) try: - segment, offset = self.index.pop(id) + in_index = self.index.pop(id) except KeyError: raise self.ObjectNotFound(id, self.path) from None # if we get here, there is an object with this id in the repo, # we write a DEL here that shadows the respective PUT. # after the delete, the object is not in the repo index any more, # for the compaction code, we need to update the shadow_index in this case. - self._delete(id, segment, offset, update_shadow_index=True) + self._delete(id, in_index.segment, in_index.offset, in_index.size, update_shadow_index=True) - def _delete(self, id, segment, offset, *, update_shadow_index): + def _delete(self, id, segment, offset, size, *, update_shadow_index): # common code used by put and delete if update_shadow_index: self.shadow_index.setdefault(id, []).append(segment) self.segments[segment] -= 1 - size = self.io.read(segment, offset, id, read_data=False) - self.compact[segment] += size + self.compact[segment] += size + header_size(TAG_PUT2) segment, size = self.io.write_delete(id) self.compact[segment] += size self.segments.setdefault(segment, 0) @@ -1515,7 +1529,8 @@ def iter_objects(self, segment, offset=0, include_data=False, read_data=True): if include_data: yield tag, key, offset, data else: - yield tag, key, offset, size + yield tag, key, offset, size - header_size(tag) # corresponds to len(data) + assert size >= 0 offset += size # we must get the fd via get_fd() here again as we yielded to our caller and it might # have triggered closing of the fd we had before (e.g. by calling io.read() for @@ -1580,7 +1595,7 @@ def entry_hash(self, *data): h.update(d) return h.digest() - def read(self, segment, offset, id, read_data=True): + def read(self, segment, offset, id, read_data=True, *, expected_size=None): """ Read entry from *segment* at *offset* with *id*. If read_data is False the size of the entry is returned instead. @@ -1596,7 +1611,11 @@ def read(self, segment, offset, id, read_data=True): if id != key: raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format( segment, offset)) - return data if read_data else size + data_size_from_header = size - header_size(tag) + if expected_size is not None and expected_size != data_size_from_header: + raise IntegrityError(f'size from repository index: {expected_size} != ' + f'size from entry header: {data_size_from_header}') + return data if read_data else data_size_from_header def _read(self, fd, header, segment, offset, acceptable_tags, read_data=True): """ diff --git a/src/borg/testsuite/hashindex.py b/src/borg/testsuite/hashindex.py index 89a793a978..8721fc5666 100644 --- a/src/borg/testsuite/hashindex.py +++ b/src/borg/testsuite/hashindex.py @@ -87,8 +87,8 @@ def _generic_test(self, cls, make_value, sha): del idx def test_nsindex(self): - self._generic_test(NSIndex, lambda x: (x, x), - '85f72b036c692c8266e4f51ccf0cff2147204282b5e316ae508d30a448d88fef') + self._generic_test(NSIndex, lambda x: (x, x, x, x), + 'c9fe5878800d2a0691b667c665a00d4a186e204e891076d6b109016940742bed') def test_chunkindex(self): self._generic_test(ChunkIndex, lambda x: (x, x, x), @@ -102,7 +102,7 @@ def test_resize(self): initial_size = os.path.getsize(filepath) self.assert_equal(len(idx), 0) for x in range(n): - idx[H(x)] = x, x + idx[H(x)] = x, x, x, x idx.write(filepath) assert initial_size < os.path.getsize(filepath) for x in range(n): @@ -114,7 +114,7 @@ def test_resize(self): def test_iteritems(self): idx = NSIndex() for x in range(100): - idx[H(x)] = x, x + idx[H(x)] = x, x, x, x iterator = idx.iteritems() all = list(iterator) self.assert_equal(len(all), 100) @@ -515,9 +515,9 @@ class NSIndexTestCase(BaseTestCase): def test_nsindex_segment_limit(self): idx = NSIndex() with self.assert_raises(AssertionError): - idx[H(1)] = NSIndex.MAX_VALUE + 1, 0 + idx[H(1)] = NSIndex.MAX_VALUE + 1, 0, 0, 0 assert H(1) not in idx - idx[H(2)] = NSIndex.MAX_VALUE, 0 + idx[H(2)] = NSIndex.MAX_VALUE, 0, 0, 0 assert H(2) in idx @@ -532,38 +532,38 @@ def test_bug_4829(self): from struct import pack - def HH(x, y): - # make some 32byte long thing that depends on x and y. - # same x will mean a collision in the hashtable as bucket index is computed from - # first 4 bytes. giving a specific x targets bucket index x. - # y is to create different keys and does not go into the bucket index calculation. - # so, same x + different y --> collision - return pack(' collision + return pack(' Date: Thu, 12 May 2022 14:07:44 +0200 Subject: [PATCH 2/2] repository: sync write file in get_fd this fixes a strange test failure that did not happen until now: it could not read the MAGIC bytes from a (quite new) segment file, it just returned the empty string. maybe its appearance is related to the removed I/O calls. --- src/borg/repository.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/borg/repository.py b/src/borg/repository.py index 1d56cd0a49..6cc061bda5 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -1462,6 +1462,9 @@ def clean_old(): del self.fds[k] clean_old() + if self._write_fd is not None: + # without this, we have test failure now + self._write_fd.sync() try: ts, fd = self.fds[segment] except KeyError: