Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ To set up this plugin locally, first checkout the code. Then run the tests with
cd datasette-export-database
uv run pytest
```
To try the plugin out (with the permission set for every user):
```bash
uv run datasette test.db --create -s permissions.export-database true
```
26 changes: 18 additions & 8 deletions datasette_export_database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
import itsdangerous
from jinja2.filters import do_filesizeformat
import pathlib
import secrets
import shutil
import tempfile
import time

SIGNED_URL_TTL = 3600

TMP_PREFIX = "datasette-export-database-"
tmp_dir = tempfile.gettempdir()

Expand Down Expand Up @@ -66,11 +67,13 @@ async def export_database(datasette, request, send):
unsigned = datasette.unsign(signature, "export-database")
except itsdangerous.exc.BadSignature:
return Response.text("Bad signature", status=403)
# csrftoken should match
if not secrets.compare_digest(
request.cookies.get("ds_csrftoken") or "", unsigned["csrf"]
):
return Response.text("Signature csrftoken did not match", status=403)
if unsigned.get("exp", 0) < time.time():
return Response.text("Signature expired", status=403)
actor_id = (request.actor or {}).get("id")
if actor_id != unsigned.get("actor_id"):
return Response.text("Signature actor did not match", status=403)
if unsigned.get("database") != database:
return Response.text("Signature database did not match", status=403)

# Is there enough space in /tmp ?
db_size_bytes = pathlib.Path(db_path).stat().st_size
Expand Down Expand Up @@ -135,11 +138,18 @@ async def inner():
db = datasette.get_database(database)
if db.path is None:
return
# Signing with the csrftoken because even anonymous users will have one
# Sign a short-lived URL tied to the current actor
href = (
datasette.urls.database(database)
+ "/-/export-database?s="
+ datasette.sign({"csrf": request.scope["csrftoken"]()}, "export-database")
+ datasette.sign(
{
"actor_id": (actor or {}).get("id"),
"database": database,
"exp": int(time.time()) + SIGNED_URL_TTL,
},
"export-database",
)
)
return [
{
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ classifiers=[
]
requires-python = ">=3.10"
dependencies = [
"datasette>=1.0a21"
"datasette==1.0a27"
]

[project.urls]
Expand Down
49 changes: 30 additions & 19 deletions tests/test_export_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ def db_path(tmp_path_factory):
async def test_permissions_and_test_fetch(db_path, tmpdir):
datasette = Datasette([db_path])
datasette.root_enabled = True
await datasette.invoke_startup()
anon_response1 = await datasette.client.get("/data/-/export-database")
assert anon_response1.status_code == 403
anon_response2 = await datasette.client.get("/data")
assert "/export-database" not in anon_response2.text
# Now get the signed URL
cookies = {"ds_actor": datasette.sign({"a": {"id": "root"}}, "actor")}
root_response1 = await datasette.client.get("/data", cookies=cookies)
cookies["ds_csrftoken"] = root_response1.cookies["ds_csrftoken"]
assert "/export-database" in root_response1.text
# Now find the signature
signature = root_response1.text.split("/export-database?s=")[1].split('"')[0]
Expand Down Expand Up @@ -78,9 +78,9 @@ async def test_no_space(db_path, tmpdir, monkeypatch):
monkeypatch.setattr(shutil, "disk_usage", lambda x: (100, 50, 20))
datasette = Datasette([db_path])
datasette.root_enabled = True
await datasette.invoke_startup()
cookies = {"ds_actor": datasette.sign({"a": {"id": "root"}}, "actor")}
root_response1 = await datasette.client.get("/data", cookies=cookies)
cookies["ds_csrftoken"] = root_response1.cookies["ds_csrftoken"]
assert "/export-database" in root_response1.text
# Now find the signature
signature = root_response1.text.split("/export-database?s=")[1].split('"')[0]
Expand Down Expand Up @@ -112,33 +112,44 @@ async def test_cleans_up_stale_tmp_files_on_startup(db_path):
assert two_minutes_ago.exists()
# Starting Datasette should delete one but not the other
datasette = Datasette()
await datasette.client.get("/")
await datasette.invoke_startup()
assert not two_hours_ago.exists()
assert two_minutes_ago.exists()


@pytest.mark.asyncio
async def test_bad_csrftoken(db_path):
async def test_anonymous_with_permission(db_path):
# When export-database is granted to anon, the link is shown with a
# signature carrying actor_id=None, and any anonymous request can use it.
datasette = Datasette([db_path], config={"permissions": {"export-database": True}})
await datasette.invoke_startup()
page = await datasette.client.get("/data")
assert "/export-database" in page.text
signature = page.text.split("/export-database?s=")[1].split('"')[0]
response = await datasette.client.get("/data/-/export-database?s=" + signature)
assert response.status_code == 200


@pytest.mark.asyncio
async def test_signature_tied_to_actor(db_path):
# Use root's signed URL but send it with a different actor cookie.
datasette = Datasette([db_path])
datasette.root_enabled = True
cookies1 = {"ds_actor": datasette.sign({"a": {"id": "root"}}, "actor")}
cookies2 = {"ds_actor": datasette.sign({"a": {"id": "root"}}, "actor")}
root_response1 = await datasette.client.get("/data", cookies=cookies1)
cookies1["ds_csrftoken"] = root_response1.cookies["ds_csrftoken"]
root_response2 = await datasette.client.get("/data", cookies=cookies2)
cookies2["ds_csrftoken"] = root_response2.cookies["ds_csrftoken"]
signature1 = root_response1.text.split("/export-database?s=")[1].split('"')[0]
signature2 = root_response1.text.split("/export-database?s=")[1].split('"')[0]
# Trying to use signature2 to export database for root1 user will break
await datasette.invoke_startup()
root_cookies = {"ds_actor": datasette.sign({"a": {"id": "root"}}, "actor")}
other_cookies = {"ds_actor": datasette.sign({"a": {"id": "other"}}, "actor")}
root_response = await datasette.client.get("/data", cookies=root_cookies)
signature = root_response.text.split("/export-database?s=")[1].split('"')[0]
# A different actor cannot use root's signed URL
response = await datasette.client.get(
"/data/-/export-database?s=" + signature1,
cookies=cookies2,
"/data/-/export-database?s=" + signature,
cookies=other_cookies,
)
assert response.status_code == 403
assert response.text == "Signature csrftoken did not match"
# But signature1 works for root1 user
assert response.text == "Signature actor did not match"
# But root can
good_response = await datasette.client.get(
"/data/-/export-database?s=" + signature1,
cookies=cookies1,
"/data/-/export-database?s=" + signature,
cookies=root_cookies,
)
assert good_response.status_code == 200