Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ API can be much simpler:
- delete: immediately remove an item from the store (given its key).
- move: implements renaming, soft delete/undelete, and moving to the current
nesting level.
- defrag: general purpose defragmentation helper (copies blocks to new items)
- stats: API call counters, time spent in API methods, data volume/throughput.
- latency/bandwidth emulator: can emulate higher latency (via BORGSTORE_LATENCY
[us]) and lower bandwidth (via BORGSTORE_BANDWIDTH [bit/s]) than what is
Expand Down Expand Up @@ -237,6 +238,7 @@ Use storage on a BorgStore REST server:
- Values: depends on backend used by the server
- Authentication: Optional Basic Auth is supported.
- hash: runs the hexdigest computation server-side.
- defrag: runs the defragmentation helper server-side.


REST Server
Expand Down
30 changes: 30 additions & 0 deletions src/borgstore/backends/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,36 @@ def delete(self, name: str) -> None:
def move(self, curr_name: str, new_name: str) -> None:
"""rename curr_name to new_name (overwrite target)"""

def defrag(self, sources, *, target=None, algorithm=None, namespace=None, levels=0) -> str:
"""
Similar to the higher-level Store.defrag method, with these differences:

- source and target item names are with namespace.
- if levels > 0, source and target item names are nested.

Returns the target item name.
"""
# default implementation: slow, but works for all backends.
# might be overridden for performance.
from ..utils.nesting import nest

data = b"".join(self.load(source, offset=offset, size=size) for source, offset, size in sources)
if target is None:
if algorithm is None:
raise ValueError("Either target or algorithm must be given for defrag")
try:
h = hashlib.new(algorithm)
except (ValueError, TypeError):
raise ValueError(f"Unsupported hash algorithm: {algorithm}")
h.update(data)
target = h.hexdigest()
if namespace:
target = namespace.rstrip("/") + "/" + target
if levels:
target = nest(target, levels)
self.store(target, data)
return target

def hash(self, name: str, algorithm: str = "sha256") -> str:
"""compute full-file hex digest of <name> content using <algorithm>"""
# default implementation: slow, but works for all backends.
Expand Down
17 changes: 17 additions & 0 deletions src/borgstore/backends/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import os
import re
import json
from typing import Iterator, Dict, Optional
from http import HTTPStatus as HTTP
from urllib.parse import unquote
Expand Down Expand Up @@ -211,6 +212,22 @@ def move(self, curr_name: str, new_name: str) -> None:
response = self._request("post", self._url(""), params={"cmd": "move", "current": curr_name, "new": new_name})
self._handle_response(response, f"{curr_name} -> {new_name}")

def defrag(self, sources, *, target=None, algorithm=None, namespace=None, levels=0) -> str:
self._assert_open()
params = {"cmd": "defrag"}
if target is not None:
params["target"] = target
if algorithm is not None:
params["algorithm"] = algorithm
if namespace is not None:
params["namespace"] = namespace
if levels:
params["levels"] = levels
data = json.dumps(sources).encode("utf-8")
response = self._request("post", self._url(""), params=params, data=data)
self._handle_response(response, "defrag")
return response.text

def hash(self, name: str, algorithm: str = "sha256") -> str:
self._assert_open()
validate_name(name)
Expand Down
23 changes: 23 additions & 0 deletions src/borgstore/server/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,29 @@ def do_POST(self):
self._handle_exception(e, f"hash {self.name}")
return

if cmd == "defrag":
target = self.query.get("target", [None])[0]
algorithm = self.query.get("algorithm", [None])[0]
namespace = self.query.get("namespace", [None])[0]
levels = int(self.query.get("levels", [0])[0])
if not target and not algorithm:
self.send_error(HTTP.BAD_REQUEST, "Missing target or algorithm for defrag")
return
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
sources = json.loads(body)
with self.server.backend:
target = self.server.backend.defrag(
sources, target=target, algorithm=algorithm, namespace=namespace, levels=levels
)
self.respond(HTTP.OK, data=target.encode("ascii"), content_type="text/plain")
except ValueError as e:
self.send_error(HTTP.BAD_REQUEST, str(e))
except Exception as e:
self._handle_exception(e, "defrag")
return

if self.name:
try:
content_length = int(self.headers.get("Content-Length", 0))
Expand Down
28 changes: 27 additions & 1 deletion src/borgstore/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import time
from typing import Iterator, Optional

from .utils.nesting import nest
from .utils.nesting import nest, unnest
from .backends._base import ItemInfo, BackendBase
from .backends.errors import ObjectNotFound, NoBackendGiven, BackendURLInvalid # noqa
from .backends.posixfs import get_file_backend
Expand Down Expand Up @@ -282,6 +282,32 @@ def move(
with self._stats_updater("move", msg + f" [{nested_name!r}, {nested_new_name!r}]"):
self.backend.move(nested_name, nested_new_name)

def defrag(self, sources, *, target=None, algorithm=None, namespace=None, deleted=False) -> str:
"""
efficiently create a new item (target) by combining blocks from existing items (sources)
in the same namespace. item and target names are always without namespace.

sources is a list of (name, block_offset, block_length) tuples. blocks will be processed
in order of appearance in the list and their contents will be appended to the target item.

if the target name is not given, algorithm must be given to compute the target name
as hash(algorithm, target_content).hexdigest().

returns the target name.
"""
prefix = (namespace + "/") if namespace else ""
mapped_sources = [
(self.find(prefix + source, deleted=deleted), offset, size) for source, offset, size in sources
]
if target is not None:
target = self.find(prefix + target, deleted=deleted)

levels = self._get_levels(prefix)[-1] if prefix else 0
backend_target = self.backend.defrag(
mapped_sources, target=target, algorithm=algorithm, namespace=prefix.rstrip("/"), levels=levels
)
return unnest(backend_target, namespace=prefix).removeprefix(prefix)

def list(self, name: str, deleted: bool = False) -> Iterator[ItemInfo]:
"""
List all names in the namespace <name>.
Expand Down
156 changes: 156 additions & 0 deletions tests/test_server_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,159 @@ def test_rest_server_hash(rest_server_with_auth):
be.hash("test/nonexistent")
finally:
be.close()


def test_rest_server_defrag(tmp_path):
import json
import requests

backend_url = tmp_path.as_uri()
address, port = "127.0.0.1", 0
username, password = "testuser", "testpassword"

server, thread = start_server(backend_url, address, port, username, password)
host, assigned_port = server.server_address
url = f"http://{host}:{assigned_port}/"
headers = {"Accept": "application/vnd.x.borgstore.rest.v1"}
auth = (username, password)

try:
# 1. Create backend
requests.post(url + "?cmd=create", auth=auth, headers=headers).raise_for_status()

# 2. Store some initial data
requests.post(url + "file1", data=b"0123456789", auth=auth, headers=headers).raise_for_status()
requests.post(url + "file2", data=b"abcdefghij", auth=auth, headers=headers).raise_for_status()

# 3. Call defrag
# We want to take "234" from file1 (offset 2, size 3) and "fg" from file2 (offset 5, size 2)
# Expected result: "234fg"
sources = [("file1", 2, 3), ("file2", 5, 2)]
response = requests.post(
url + "?cmd=defrag&target=targetfile", data=json.dumps(sources), auth=auth, headers=headers
)
response.raise_for_status()
assert response.text == "targetfile"
assert response.headers["Content-Type"] == "text/plain"

# 4. Verify the result
response = requests.get(url + "targetfile", auth=auth, headers=headers)
response.raise_for_status()
assert response.content == b"234fg"

# 5. Test with empty list
response = requests.post(url + "?cmd=defrag&target=emptyfile", data=json.dumps([]), auth=auth, headers=headers)
response.raise_for_status()
response = requests.get(url + "emptyfile", auth=auth, headers=headers)
assert response.content == b""

# 6. Test with missing target
response = requests.post(url + "?cmd=defrag", data=json.dumps(sources), auth=auth, headers=headers)
assert response.status_code == 400
assert "Missing target or algorithm" in response.text

# 7. Test with algorithm but no target
combined_data = b"234fg"
algo = "sha256"
expected_hash = hashlib.sha256(combined_data).hexdigest()
response = requests.post(
url + f"?cmd=defrag&algorithm={algo}", data=json.dumps(sources), auth=auth, headers=headers
)
response.raise_for_status()
assert response.text == expected_hash
assert response.headers["Content-Type"] == "text/plain"
response = requests.get(url + expected_hash, auth=auth, headers=headers)
assert response.content == combined_data

# 8. Test that target overrides algorithm
# Even if algorithm is provided, if target is also provided, target is used.
response = requests.post(
url + f"?cmd=defrag&target=override_target&algorithm={algo}",
data=json.dumps(sources),
auth=auth,
headers=headers,
)
response.raise_for_status()
assert response.text == "override_target"
response = requests.get(url + "override_target", auth=auth, headers=headers)
assert response.content == combined_data

# 9. Test with levels=1 and algorithm
from borgstore.utils.nesting import nest

response = requests.post(
url + f"?cmd=defrag&algorithm={algo}&levels=1", data=json.dumps(sources), auth=auth, headers=headers
)
response.raise_for_status()
expected_nested_res = nest(expected_hash, levels=1)
assert response.text == expected_nested_res
response = requests.get(url + expected_nested_res, auth=auth, headers=headers)
assert response.content == combined_data

# 10. Test with namespace, levels=1 and algorithm
namespace = "ns1"
response = requests.post(
url + f"?cmd=defrag&algorithm={algo}&namespace={namespace}&levels=1",
data=json.dumps(sources),
auth=auth,
headers=headers,
)
response.raise_for_status()
expected_nested_res_ns = nest(namespace + "/" + expected_hash, levels=1)
assert response.text == expected_nested_res_ns
response = requests.get(url + expected_nested_res_ns, auth=auth, headers=headers)
assert response.content == combined_data

finally:
server.shutdown()
server.server_close()


def test_rest_backend_defrag(rest_server_with_auth):
be = rest_server_with_auth
be.create()
be.open()
try:
be.store("file1", b"0123456789")
be.store("file2", b"abcdefghij")

# Test defrag with target
sources = [("file1", 2, 3), ("file2", 5, 2)]
res = be.defrag(sources, target="target1")
assert res == "target1"
assert be.load("target1") == b"234fg"

# Test defrag with algorithm
res = be.defrag(sources, algorithm="sha256")
expected_hash = hashlib.sha256(b"234fg").hexdigest()
assert res == expected_hash
assert be.load(expected_hash) == b"234fg"

# Test with empty sources
res = be.defrag([], target="empty")
assert res == "empty"
assert be.load("empty") == b""

# Test error: neither target nor algorithm
with pytest.raises(ValueError, match="Missing target or algorithm"):
be.defrag(sources)

# Test error: unsupported algorithm
with pytest.raises(ValueError, match="Unsupported hash algorithm"):
be.defrag(sources, algorithm="invalid")

# Test defrag with levels=1 and algorithm
from borgstore.utils.nesting import nest

res = be.defrag(sources, algorithm="sha256", levels=1)
expected_hash = hashlib.sha256(b"234fg").hexdigest()
assert res == nest(expected_hash, levels=1)
assert be.load(res) == b"234fg"

# Test defrag with namespace, levels=1 and algorithm
res = be.defrag(sources, algorithm="sha256", namespace="ns1", levels=1)
assert res == nest("ns1/" + expected_hash, levels=1)
assert be.load(res) == b"234fg"

finally:
be.close()
22 changes: 22 additions & 0 deletions tests/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,28 @@ def test_basics(posixfs_store_created):
assert list(store.list(ns)) == []


def test_defrag_nested(posixfs_store_created):
ns = "two" # nested! LEVELS_CONFIG has {"two/": [2]}
v1 = b"0123456789"
v2 = b"abcdefghij"
with posixfs_store_created as store:
store.store(ns + "/file1", v1)
store.store(ns + "/file2", v2)

# 1. Test defrag with explicit target
sources = [("file1", 2, 3), ("file2", 5, 2)]
expected_data = b"234fg"
res = store.defrag(sources, target="target1", namespace=ns)
assert res == "target1"
assert store.load(ns + "/" + res) == expected_data

# 2. Test defrag with algorithm (auto target name)
res = store.defrag(sources, algorithm="sha256", namespace=ns)
expected_hash = hashlib.sha256(expected_data).hexdigest()
assert res == expected_hash
assert store.load(ns + "/" + res) == expected_data


def test_hash(posixfs_store_created):
ns = "two"
k0 = key(0)
Expand Down
Loading