From 729e3eddc91aa83b9a78adf0b209cecd65abece2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Tue, 15 Oct 2024 00:52:41 +0200 Subject: [PATCH 1/2] refactor: allow specification of gateway address via API parameter --- ipfsspec/async_ipfs.py | 11 ++++++++--- test/test_async.py | 5 +++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index a6dc270..726866d 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -180,13 +180,17 @@ def gateway_from_file(gateway_path, protocol="ipfs"): @lru_cache -def get_gateway(protocol="ipfs"): +def get_gateway(protocol="ipfs", gateway_addr=None): """ Get IPFS gateway according to IPIP-280 see: https://github.com/ipfs/specs/pull/280 """ + if gateway_addr: + logger.debug("using IPFS gateway as specified via function argument: %s", gateway_addr) + return AsyncIPFSGateway(gateway_addr, protocol) + # IPFS_GATEWAY environment variable should override everything ipfs_gateway = os.environ.get("IPFS_GATEWAY", "") if ipfs_gateway: @@ -263,19 +267,20 @@ class AsyncIPFSFileSystem(AsyncFileSystem): sep = "/" protocol = "ipfs" - def __init__(self, asynchronous=False, loop=None, client_kwargs=None, **storage_options): + def __init__(self, asynchronous=False, loop=None, client_kwargs=None, gateway_addr=None, **storage_options): super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options) self._session = None self.client_kwargs = client_kwargs or {} self.get_client = get_client + self.gateway_addr = gateway_addr if not asynchronous: sync(self.loop, self.set_session) @property def gateway(self): - return get_gateway(self.protocol) + return get_gateway(self.protocol, gateway_addr=self.gateway_addr) @staticmethod def close_session(loop, session): diff --git a/test/test_async.py b/test/test_async.py index ebaf6f3..afec6eb 100644 --- a/test/test_async.py +++ b/test/test_async.py @@ -22,9 +22,10 @@ async def get_client(**kwargs): @pytest_asyncio.fixture -async def fs(get_client): +async def fs(request, get_client): AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop - return AsyncIPFSFileSystem(asynchronous=True, loop=asyncio.get_running_loop(), get_client=get_client) + gateway_addr = getattr(request, "param", None) + return AsyncIPFSFileSystem(asynchronous=True, loop=asyncio.get_running_loop(), get_client=get_client, gateway_addr=gateway_addr) @pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"]) From 30202110a8bfdf8832660302c209714a0809a891 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Tue, 15 Oct 2024 01:13:54 +0200 Subject: [PATCH 2/2] request car instead of raw from gateways Trustless gateways may not always return raw IPLD blocks when a path is requested. They should however always return car. Fixes #39 --- ipfsspec/async_ipfs.py | 35 ++++++++++--- ipfsspec/car.py | 116 +++++++++++++++++++++++++++++++++++++++++ ipfsspec/utils.py | 21 ++++++++ test/test_async.py | 17 ++++++ 4 files changed, 181 insertions(+), 8 deletions(-) create mode 100644 ipfsspec/car.py create mode 100644 ipfsspec/utils.py diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 726866d..571503e 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -18,6 +18,7 @@ from multiformats import CID, multicodec from . import unixfsv1 +from .car import read_car import logging @@ -69,20 +70,30 @@ def __str__(self): return f"GW({self.url})" async def info(self, path, session): - res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"}) + res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.car"}, params={"format": "car", "dag-scope": "block"}) self._raise_not_found_for_status(res, path) - cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1]) + + roots = res.headers["X-Ipfs-Roots"].split(",") + if len(roots) != len(path.split("/")): + raise FileNotFoundError(path) + + cid = CID.decode(roots[-1]) resdata = await res.read() + _, blocks = read_car(resdata) # roots should be ignored by https://specs.ipfs.tech/http-gateways/trustless-gateway/ + blocks = {cid: data for cid, data, _ in blocks} + block = blocks[cid] + if cid.codec == RawCodec: return { "name": path, "CID": str(cid), "type": "file", - "size": len(resdata), + "size": len(block), } elif cid.codec == DagPbCodec: - node = unixfsv1.PBNode.loads(resdata) + + node = unixfsv1.PBNode.loads(block) data = unixfsv1.Data.loads(node.Data) if data.Type == unixfsv1.DataType.Raw: raise FileNotFoundError(path) # this is not a file, it's only a part of it @@ -133,12 +144,20 @@ async def iter_chunked(self, path, session, chunk_size): yield size, res.content.iter_chunked(chunk_size) async def ls(self, path, session, detail=False): - res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"}) + res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.car"}, params={"format": "car", "dag-scope": "block"}) self._raise_not_found_for_status(res, path) - resdata = await res.read() - cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1]) + roots = res.headers["X-Ipfs-Roots"].split(",") + if len(roots) != len(path.split("/")): + raise FileNotFoundError(path) + + cid = CID.decode(roots[-1]) assert cid.codec == DagPbCodec, "this is not a directory" - node = unixfsv1.PBNode.loads(resdata) + + resdata = await res.read() + + _, blocks = read_car(resdata) # roots should be ignored by https://specs.ipfs.tech/http-gateways/trustless-gateway/ + blocks = {cid: data for cid, data, _ in blocks} + node = unixfsv1.PBNode.loads(blocks[cid]) data = unixfsv1.Data.loads(node.Data) if data.Type != unixfsv1.DataType.Directory: # TODO: we might need support for HAMTShard here (for large directories) diff --git a/ipfsspec/car.py b/ipfsspec/car.py new file mode 100644 index 0000000..028e41e --- /dev/null +++ b/ipfsspec/car.py @@ -0,0 +1,116 @@ +""" +CAR handling functions. +""" + +from typing import List, Optional, Tuple, Union, Iterator, BinaryIO +import dataclasses + +import dag_cbor +from multiformats import CID, varint, multicodec, multihash + +from .utils import is_cid_list, StreamLike, ensure_stream + +DagPbCodec = multicodec.get("dag-pb") +Sha256Hash = multihash.get("sha2-256") + +@dataclasses.dataclass +class CARBlockLocation: + varint_size: int + cid_size: int + payload_size: int + offset: int = 0 + + @property + def cid_offset(self) -> int: + return self.offset + self.varint_size + + @property + def payload_offset(self) -> int: + return self.offset + self.varint_size + self.cid_size + + @property + def size(self) -> int: + return self.varint_size + self.cid_size + self.payload_size + + +def decode_car_header(stream: BinaryIO) -> Tuple[List[CID], int]: + """ + Decodes a CAR header and returns the list of contained roots. + """ + header_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase + header = dag_cbor.decode(stream.read(header_size)) + if not isinstance(header, dict): + raise ValueError("no valid CAR header found") + if header["version"] != 1: + raise ValueError("CAR is not version 1") + roots = header["roots"] + if not isinstance(roots, list): + raise ValueError("CAR header doesn't contain roots") + if not is_cid_list(roots): + raise ValueError("CAR roots do not only contain CIDs") + return roots, visize + header_size + + +def decode_raw_car_block(stream: BinaryIO) -> Optional[Tuple[CID, bytes, CARBlockLocation]]: + try: + block_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase + except ValueError: + # stream has likely been consumed entirely + return None + + data = stream.read(block_size) + # as the size of the CID is variable but not explicitly given in + # the CAR format, we need to partially decode each CID to determine + # its size and the location of the payload data + if data[0] == 0x12 and data[1] == 0x20: + # this is CIDv0 + cid_version = 0 + default_base = "base58btc" + cid_codec: Union[int, multicodec.Multicodec] = DagPbCodec + hash_codec: Union[int, multihash.Multihash] = Sha256Hash + cid_digest = data[2:34] + data = data[34:] + else: + # this is CIDv1(+) + cid_version, _, data = varint.decode_raw(data) + if cid_version != 1: + raise ValueError(f"CIDv{cid_version} is currently not supported") + default_base = "base32" + cid_codec, _, data = multicodec.unwrap_raw(data) + hash_codec, _, data = varint.decode_raw(data) + digest_size, _, data = varint.decode_raw(data) + cid_digest = data[:digest_size] + data = data[digest_size:] + cid = CID(default_base, cid_version, cid_codec, (hash_codec, cid_digest)) + + if not cid.hashfun.digest(data) == cid.digest: + raise ValueError(f"CAR is corrupted. Entry '{cid}' could not be verified") + + return cid, bytes(data), CARBlockLocation(visize, block_size - len(data), len(data)) + + +def read_car(stream_or_bytes: StreamLike) -> Tuple[List[CID], Iterator[Tuple[CID, bytes, CARBlockLocation]]]: + """ + Reads a CAR. + + Parameters + ---------- + stream_or_bytes: StreamLike + Stream to read CAR from + + Returns + ------- + roots : List[CID] + Roots as given by the CAR header + blocks : Iterator[Tuple[cid, BytesLike, CARBlockLocation]] + Iterator over all blocks contained in the CAR + """ + stream = ensure_stream(stream_or_bytes) + roots, header_size = decode_car_header(stream) + def blocks() -> Iterator[Tuple[CID, bytes, CARBlockLocation]]: + offset = header_size + while (next_block := decode_raw_car_block(stream)) is not None: + cid, data, sizes = next_block + yield cid, data, dataclasses.replace(sizes, offset=offset) + offset += sizes.size + return roots, blocks() diff --git a/ipfsspec/utils.py b/ipfsspec/utils.py new file mode 100644 index 0000000..d1bb255 --- /dev/null +++ b/ipfsspec/utils.py @@ -0,0 +1,21 @@ +""" +Some utilities. +""" + +from io import BytesIO +from typing import List, Union, BinaryIO + +from multiformats import CID +from typing_extensions import TypeGuard + +StreamLike = Union[BinaryIO, bytes] + +def ensure_stream(stream_or_bytes: StreamLike) -> BinaryIO: + if isinstance(stream_or_bytes, bytes): + return BytesIO(stream_or_bytes) + else: + return stream_or_bytes + + +def is_cid_list(os: List[object]) -> TypeGuard[List[CID]]: + return all(isinstance(o, CID) for o in os) diff --git a/test/test_async.py b/test/test_async.py index afec6eb..4034d53 100644 --- a/test/test_async.py +++ b/test/test_async.py @@ -101,3 +101,20 @@ async def test_isfile(fs): assert res is True res = await fs._isfile(TEST_ROOT) assert res is False + +@pytest.mark.parametrize("detail", [False, True]) +@pytest.mark.parametrize("fs", ["http://127.0.0.1:8080", "https://ipfs.io"], indirect=True) +@pytest.mark.asyncio +async def test_ls_multi_gw(fs, detail): + """ + Test if ls works on different gateway implementations. + + See also: https://github.com/fsspec/ipfsspec/issues/39 + """ + res = await fs._ls("bafybeicn7i3soqdgr7dwnrwytgq4zxy7a5jpkizrvhm5mv6bgjd32wm3q4", detail=detail) + expected = "bafybeicn7i3soqdgr7dwnrwytgq4zxy7a5jpkizrvhm5mv6bgjd32wm3q4/welcome-to-IPFS.jpg" + if detail: + assert len(res) == 1 + assert res[0]["name"] == expected + else: + assert res == [expected]