diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 0e0ecc913c..6b2c5eb8b6 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -79,6 +79,20 @@ assert UINT32_MAX == 2**32-1 assert _MAX_VALUE % 2 == 1 +def hashindex_variant(fn): + """peek into an index file and find out what it is""" + with open(fn, 'rb') as f: + hh = f.read(18) # len(HashHeader) + magic = hh[0:8] + if magic == b'BORG_IDX': + key_size = hh[16] + value_size = hh[17] + return f'k{key_size}_v{value_size}' + if magic == b'12345678': # used by unit tests + return 'k32_v16' # just return the current variant + raise ValueError(f'unknown hashindex format, magic: {magic!r}') + + @cython.internal cdef class IndexBase: cdef HashIndex *index @@ -198,9 +212,12 @@ cdef class FuseVersionsIndex(IndexBase): return hashindex_get(self.index, key) != NULL +NSIndexEntry = namedtuple('NSIndexEntry', 'segment offset size extra') + + cdef class NSIndex(IndexBase): - value_size = 8 + value_size = 16 def __getitem__(self, key): assert len(key) == self.key_size @@ -209,15 +226,17 @@ cdef class NSIndex(IndexBase): raise KeyError(key) cdef uint32_t segment = _le32toh(data[0]) assert segment <= _MAX_VALUE, "maximum number of segments reached" - return segment, _le32toh(data[1]) + return NSIndexEntry(segment, _le32toh(data[1]), _le32toh(data[2]), _le32toh(data[3])) def __setitem__(self, key, value): assert len(key) == self.key_size - cdef uint32_t[2] data + cdef uint32_t[4] data cdef uint32_t segment = value[0] assert segment <= _MAX_VALUE, "maximum number of segments reached" data[0] = _htole32(segment) data[1] = _htole32(value[1]) + data[2] = _htole32(value[2]) + data[3] = _htole32(value[3]) if not hashindex_set(self.index, key, data): raise Exception('hashindex_set failed') @@ -250,6 +269,80 @@ cdef class NSKeyIterator: cdef int key_size cdef int exhausted + def __cinit__(self, key_size): + self.key = NULL + self.key_size = key_size + self.exhausted = 0 + + def __iter__(self): + return self + + def __next__(self): + if self.exhausted: + raise StopIteration + self.key = hashindex_next_key(self.index, self.key) + if not self.key: + self.exhausted = 1 + raise StopIteration + cdef uint32_t *value = (self.key + self.key_size) + cdef uint32_t segment = _le32toh(value[0]) + assert segment <= _MAX_VALUE, "maximum number of segments reached" + return ((self.key)[:self.key_size], + NSIndexEntry(segment, _le32toh(value[1]), _le32toh(value[2]), _le32toh(value[3]))) + + +cdef class NSIndex1(IndexBase): # legacy borg 1.x + + value_size = 8 + + def __getitem__(self, key): + assert len(key) == self.key_size + data = hashindex_get(self.index, key) + if not data: + raise KeyError(key) + cdef uint32_t segment = _le32toh(data[0]) + assert segment <= _MAX_VALUE, "maximum number of segments reached" + return segment, _le32toh(data[1]) + + def __setitem__(self, key, value): + assert len(key) == self.key_size + cdef uint32_t[2] data + cdef uint32_t segment = value[0] + assert segment <= _MAX_VALUE, "maximum number of segments reached" + data[0] = _htole32(segment) + data[1] = _htole32(value[1]) + if not hashindex_set(self.index, key, data): + raise Exception('hashindex_set failed') + + def __contains__(self, key): + cdef uint32_t segment + assert len(key) == self.key_size + data = hashindex_get(self.index, key) + if data != NULL: + segment = _le32toh(data[0]) + assert segment <= _MAX_VALUE, "maximum number of segments reached" + return data != NULL + + def iteritems(self, marker=None): + cdef const unsigned char *key + iter = NSKeyIterator1(self.key_size) + iter.idx = self + iter.index = self.index + if marker: + key = hashindex_get(self.index, marker) + if marker is None: + raise IndexError + iter.key = key - self.key_size + return iter + + +cdef class NSKeyIterator1: # legacy borg 1.x + cdef NSIndex1 idx + cdef HashIndex *index + cdef const unsigned char *key + cdef int key_size + cdef int exhausted + def __cinit__(self, key_size): self.key = NULL self.key_size = key_size diff --git a/src/borg/repository.py b/src/borg/repository.py index 9267fe0e6c..6cc061bda5 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -13,7 +13,7 @@ from itertools import islice from .constants import * # NOQA -from .hashindex import NSIndex +from .hashindex import NSIndexEntry, NSIndex, NSIndex1, hashindex_variant from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size from .helpers import Location from .helpers import ProgressIndicatorPercent @@ -52,6 +52,18 @@ FreeSpace = partial(defaultdict, int) +def header_size(tag): + if tag == TAG_PUT2: + size = LoggedIO.HEADER_ID_SIZE + LoggedIO.ENTRY_HASH_SIZE + elif tag == TAG_PUT or tag == TAG_DELETE: + size = LoggedIO.HEADER_ID_SIZE + elif tag == TAG_COMMIT: + size = LoggedIO.header_fmt.size + else: + raise ValueError(f"unsupported tag: {tag!r}") + return size + + class Repository: """ Filesystem based transactional key value store @@ -525,10 +537,14 @@ def open_index(self, transaction_id, auto_recover=True): if transaction_id is None: return NSIndex() index_path = os.path.join(self.path, 'index.%d' % transaction_id) + variant = hashindex_variant(index_path) integrity_data = self._read_integrity(transaction_id, b'index') try: with IntegrityCheckedFile(index_path, write=False, integrity_data=integrity_data) as fd: - return NSIndex.read(fd) + if variant == 'k32_v16': + return NSIndex.read(fd) + if variant == 'k32_v8': # legacy + return NSIndex1.read(fd) except (ValueError, OSError, FileIntegrityError) as exc: logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc) os.unlink(index_path) @@ -798,14 +814,14 @@ def complete_xfer(intermediate=True): if tag == TAG_COMMIT: continue in_index = self.index.get(key) - is_index_object = in_index == (segment, offset) + is_index_object = in_index and (in_index.segment, in_index.offset) == (segment, offset) if tag in (TAG_PUT2, TAG_PUT) and is_index_object: try: new_segment, offset = self.io.write_put(key, data, raise_full=True) except LoggedIO.SegmentFull: complete_xfer() new_segment, offset = self.io.write_put(key, data) - self.index[key] = new_segment, offset + self.index[key] = NSIndexEntry(new_segment, offset, len(data), in_index.extra) segments.setdefault(new_segment, 0) segments[new_segment] += 1 segments[segment] -= 1 @@ -821,10 +837,7 @@ def complete_xfer(intermediate=True): # do not remove entry with empty shadowed_segments list here, # it is needed for shadowed_put_exists code (see below)! pass - if tag == TAG_PUT2: - self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE - elif tag == TAG_PUT: - self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE + self.storage_quota_use -= header_size(tag) + len(data) elif tag == TAG_DELETE and not in_index: # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag, # therefore we do not drop the delete, but write it to a current segment. @@ -919,27 +932,26 @@ def _update_index(self, segment, objects, report=None): if tag in (TAG_PUT2, TAG_PUT): try: # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space - s, _ = self.index[key] - self.compact[s] += size - self.segments[s] -= 1 + in_index = self.index[key] + self.compact[in_index.segment] += header_size(tag) + size + self.segments[in_index.segment] -= 1 except KeyError: pass - self.index[key] = segment, offset + self.index[key] = NSIndexEntry(segment, offset, size, 0) self.segments[segment] += 1 - self.storage_quota_use += size # note: size already includes the put_header_fmt overhead + self.storage_quota_use += header_size(tag) + size elif tag == TAG_DELETE: try: # if the deleted PUT is not in the index, there is nothing to clean up - s, offset = self.index.pop(key) + in_index = self.index.pop(key) except KeyError: pass else: - if self.io.segment_exists(s): + if self.io.segment_exists(in_index.segment): # the old index is not necessarily valid for this transaction (e.g. compaction); if the segment # is already gone, then it was already compacted. - self.segments[s] -= 1 - size = self.io.read(s, offset, key, read_data=False) - self.compact[s] += size + self.segments[in_index.segment] -= 1 + self.compact[in_index.segment] += header_size(tag) + in_index.size elif tag == TAG_COMMIT: continue else: @@ -968,12 +980,13 @@ def _rebuild_sparse(self, segment): self.compact[segment] = 0 for tag, key, offset, size in self.io.iter_objects(segment, read_data=False): if tag in (TAG_PUT2, TAG_PUT): - if self.index.get(key, (-1, -1)) != (segment, offset): + in_index = self.index.get(key) + if not in_index or (in_index.segment, in_index.offset) != (segment, offset): # This PUT is superseded later - self.compact[segment] += size + self.compact[segment] += header_size(tag) + size elif tag == TAG_DELETE: # The outcome of the DELETE has been recorded in the PUT branch already - self.compact[segment] += size + self.compact[segment] += header_size(tag) + size def check(self, repair=False, save_space=False, max_duration=0): """Check repository consistency @@ -1169,7 +1182,7 @@ def scan(self, limit=None, marker=None): self.index = self.open_index(transaction_id) at_start = marker is None # smallest valid seg is 0, smallest valid offs is 8 - start_segment, start_offset = (0, 0) if at_start else self.index[marker] + start_segment, start_offset, _, _ = (0, 0, 0, 0) if at_start else self.index[marker] result = [] for segment, filename in self.io.segment_iterator(start_segment): obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False, include_data=False) @@ -1186,19 +1199,21 @@ def scan(self, limit=None, marker=None): # also, for the next segment, we need to start at offset 0. start_offset = 0 continue - if tag in (TAG_PUT2, TAG_PUT) and (segment, offset) == self.index.get(id): - # we have found an existing and current object - result.append(id) - if len(result) == limit: - return result + if tag in (TAG_PUT2, TAG_PUT): + in_index = self.index.get(id) + if in_index and (in_index.segment, in_index.offset) == (segment, offset): + # we have found an existing and current object + result.append(id) + if len(result) == limit: + return result return result def get(self, id): if not self.index: self.index = self.open_index(self.get_transaction_id()) try: - segment, offset = self.index[id] - return self.io.read(segment, offset, id) + in_index = NSIndexEntry(*((self.index[id] + (None, None))[:4])) # legacy: no size/extra + return self.io.read(in_index.segment, in_index.offset, id, expected_size=in_index.size) except KeyError: raise self.ObjectNotFound(id, self.path) from None @@ -1215,7 +1230,7 @@ def put(self, id, data, wait=True): if not self._active_txn: self.prepare_txn(self.get_transaction_id()) try: - segment, offset = self.index[id] + in_index = self.index[id] except KeyError: pass else: @@ -1223,12 +1238,12 @@ def put(self, id, data, wait=True): # we do not want to update the shadow_index here, because # we know already that we will PUT to this id, so it will # be in the repo index (and we won't need it in the shadow_index). - self._delete(id, segment, offset, update_shadow_index=False) + self._delete(id, in_index.segment, in_index.offset, in_index.size, update_shadow_index=False) segment, offset = self.io.write_put(id, data) - self.storage_quota_use += len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE + self.storage_quota_use += header_size(TAG_PUT2) + len(data) self.segments.setdefault(segment, 0) self.segments[segment] += 1 - self.index[id] = segment, offset + self.index[id] = NSIndexEntry(segment, offset, len(data), 0) if self.storage_quota and self.storage_quota_use > self.storage_quota: self.transaction_doomed = self.StorageQuotaExceeded( format_file_size(self.storage_quota), format_file_size(self.storage_quota_use)) @@ -1243,22 +1258,21 @@ def delete(self, id, wait=True): if not self._active_txn: self.prepare_txn(self.get_transaction_id()) try: - segment, offset = self.index.pop(id) + in_index = self.index.pop(id) except KeyError: raise self.ObjectNotFound(id, self.path) from None # if we get here, there is an object with this id in the repo, # we write a DEL here that shadows the respective PUT. # after the delete, the object is not in the repo index any more, # for the compaction code, we need to update the shadow_index in this case. - self._delete(id, segment, offset, update_shadow_index=True) + self._delete(id, in_index.segment, in_index.offset, in_index.size, update_shadow_index=True) - def _delete(self, id, segment, offset, *, update_shadow_index): + def _delete(self, id, segment, offset, size, *, update_shadow_index): # common code used by put and delete if update_shadow_index: self.shadow_index.setdefault(id, []).append(segment) self.segments[segment] -= 1 - size = self.io.read(segment, offset, id, read_data=False) - self.compact[segment] += size + self.compact[segment] += size + header_size(TAG_PUT2) segment, size = self.io.write_delete(id) self.compact[segment] += size self.segments.setdefault(segment, 0) @@ -1448,6 +1462,9 @@ def clean_old(): del self.fds[k] clean_old() + if self._write_fd is not None: + # without this, we have test failure now + self._write_fd.sync() try: ts, fd = self.fds[segment] except KeyError: @@ -1515,7 +1532,8 @@ def iter_objects(self, segment, offset=0, include_data=False, read_data=True): if include_data: yield tag, key, offset, data else: - yield tag, key, offset, size + yield tag, key, offset, size - header_size(tag) # corresponds to len(data) + assert size >= 0 offset += size # we must get the fd via get_fd() here again as we yielded to our caller and it might # have triggered closing of the fd we had before (e.g. by calling io.read() for @@ -1580,7 +1598,7 @@ def entry_hash(self, *data): h.update(d) return h.digest() - def read(self, segment, offset, id, read_data=True): + def read(self, segment, offset, id, read_data=True, *, expected_size=None): """ Read entry from *segment* at *offset* with *id*. If read_data is False the size of the entry is returned instead. @@ -1596,7 +1614,11 @@ def read(self, segment, offset, id, read_data=True): if id != key: raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format( segment, offset)) - return data if read_data else size + data_size_from_header = size - header_size(tag) + if expected_size is not None and expected_size != data_size_from_header: + raise IntegrityError(f'size from repository index: {expected_size} != ' + f'size from entry header: {data_size_from_header}') + return data if read_data else data_size_from_header def _read(self, fd, header, segment, offset, acceptable_tags, read_data=True): """ diff --git a/src/borg/testsuite/hashindex.py b/src/borg/testsuite/hashindex.py index 89a793a978..8721fc5666 100644 --- a/src/borg/testsuite/hashindex.py +++ b/src/borg/testsuite/hashindex.py @@ -87,8 +87,8 @@ def _generic_test(self, cls, make_value, sha): del idx def test_nsindex(self): - self._generic_test(NSIndex, lambda x: (x, x), - '85f72b036c692c8266e4f51ccf0cff2147204282b5e316ae508d30a448d88fef') + self._generic_test(NSIndex, lambda x: (x, x, x, x), + 'c9fe5878800d2a0691b667c665a00d4a186e204e891076d6b109016940742bed') def test_chunkindex(self): self._generic_test(ChunkIndex, lambda x: (x, x, x), @@ -102,7 +102,7 @@ def test_resize(self): initial_size = os.path.getsize(filepath) self.assert_equal(len(idx), 0) for x in range(n): - idx[H(x)] = x, x + idx[H(x)] = x, x, x, x idx.write(filepath) assert initial_size < os.path.getsize(filepath) for x in range(n): @@ -114,7 +114,7 @@ def test_resize(self): def test_iteritems(self): idx = NSIndex() for x in range(100): - idx[H(x)] = x, x + idx[H(x)] = x, x, x, x iterator = idx.iteritems() all = list(iterator) self.assert_equal(len(all), 100) @@ -515,9 +515,9 @@ class NSIndexTestCase(BaseTestCase): def test_nsindex_segment_limit(self): idx = NSIndex() with self.assert_raises(AssertionError): - idx[H(1)] = NSIndex.MAX_VALUE + 1, 0 + idx[H(1)] = NSIndex.MAX_VALUE + 1, 0, 0, 0 assert H(1) not in idx - idx[H(2)] = NSIndex.MAX_VALUE, 0 + idx[H(2)] = NSIndex.MAX_VALUE, 0, 0, 0 assert H(2) in idx @@ -532,38 +532,38 @@ def test_bug_4829(self): from struct import pack - def HH(x, y): - # make some 32byte long thing that depends on x and y. - # same x will mean a collision in the hashtable as bucket index is computed from - # first 4 bytes. giving a specific x targets bucket index x. - # y is to create different keys and does not go into the bucket index calculation. - # so, same x + different y --> collision - return pack(' collision + return pack('