diff --git a/.github/workflows/python-integration.yml b/.github/workflows/python-integration.yml new file mode 100644 index 000000000000..7c6269112612 --- /dev/null +++ b/.github/workflows/python-integration.yml @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: "Python CI" +on: + push: + branches: + - 'master' + - '0.**' + tags: + - 'apache-iceberg-**' + pull_request: + paths: + - '.github/workflows/python-ci.yml' + - 'python/**' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ github.event_name == 'pull_request' }} + +jobs: + integration-test: + runs-on: ubuntu-20.04 + + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 2 + - shell: pwsh + id: check_file_changed + run: | + $diff = git diff --name-only HEAD^ HEAD + $SourceDiff = $diff | Where-Object { $_ -match '^python/dev/Dockerfile$' } + $HasDiff = $SourceDiff.Length -gt 0 + Write-Host "::set-output name=docs_changed::$HasDiff" + - name: Restore image + id: cache-docker + uses: actions/cache@v3 + with: + path: ci/cache/docker/python + key: cache-mintegration + - name: Update Image Cache if cache miss + if: steps.cache-docker.outputs.cache-hit != 'true' || steps.check_file_changed.outputs.docs_changed == 'True' + run: | + docker build -t python-integration python/dev/ && \ + mkdir -p ci/cache/docker/python && \ + docker image save python-integration --output ./ci/cache/docker/python/python-integration.tar + - name: Use Image Cache if cache hit + if: steps.cache-docker.outputs.cache-hit == 'true' + run: docker image load --input ./ci/cache/docker/python/python-integration.tar + - name: Run Apache-Spark setup + working-directory: ./python + run: | + docker-compose -f dev/docker-compose-integration.yml up -d + sleep 10 + - name: Install poetry + run: pip install poetry + - uses: actions/setup-python@v4 + with: + python-version: '3.9' + cache: poetry + cache-dependency-path: ./python/poetry.lock + - name: Install + working-directory: ./python + run: make install + - name: Tests + working-directory: ./python + run: make test-integration + - name: Show debug logs + if: ${{ failure() }} + run: docker-compose -f python/dev/docker-compose.yml logs \ No newline at end of file diff --git a/python/Makefile b/python/Makefile index 9e03b10403ac..d04466dff8bb 100644 --- a/python/Makefile +++ b/python/Makefile @@ -17,7 +17,7 @@ install: pip install poetry - poetry install -E pyarrow -E hive -E s3fs -E glue -E adlfs + poetry install -E pyarrow -E hive -E s3fs -E glue -E adlfs -E duckdb check-license: ./dev/check-license @@ -26,21 +26,22 @@ lint: poetry run pre-commit run --all-files test: - poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m "not s3 and not adlfs" ${PYTEST_ARGS} + poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m unmarked ${PYTEST_ARGS} poetry run coverage report -m --fail-under=90 poetry run coverage html poetry run coverage xml test-s3: sh ./dev/run-minio.sh - poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m "not adlfs" ${PYTEST_ARGS} - poetry run coverage report -m --fail-under=90 - poetry run coverage html - poetry run coverage xml + poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m s3 ${PYTEST_ARGS} + +test-integration: + docker-compose -f dev/docker-compose-integration.yml kill + docker-compose -f dev/docker-compose-integration.yml build + docker-compose -f dev/docker-compose-integration.yml up -d + sleep 20 + poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m integration ${PYTEST_ARGS} test-adlfs: sh ./dev/run-azurite.sh - poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m "not s3" ${PYTEST_ARGS} - poetry run coverage report -m --fail-under=90 - poetry run coverage html - poetry run coverage xml + poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m adlfs ${PYTEST_ARGS} diff --git a/python/dev/Dockerfile b/python/dev/Dockerfile new file mode 100644 index 000000000000..13508dd668eb --- /dev/null +++ b/python/dev/Dockerfile @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.9-bullseye + +RUN apt-get -qq update && \ + apt-get -qq install -y --no-install-recommends \ + sudo \ + curl \ + vim \ + unzip \ + openjdk-11-jdk \ + build-essential \ + software-properties-common \ + ssh && \ + apt-get -qq clean && \ + rm -rf /var/lib/apt/lists/* + +# Optional env variables +ENV SPARK_HOME=${SPARK_HOME:-"/opt/spark"} +ENV HADOOP_HOME=${HADOOP_HOME:-"/opt/hadoop"} +ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.5-src.zip:$PYTHONPATH + +RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/spark-events +WORKDIR ${SPARK_HOME} + +ENV SPARK_VERSION=3.3.2 + +RUN curl -s https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \ + && tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \ + && rm -rf spark-${SPARK_VERSION}-bin-hadoop3.tgz + +# Download iceberg spark runtime +RUN curl -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.1.0/iceberg-spark-runtime-3.3_2.12-1.1.0.jar -Lo iceberg-spark-runtime-3.3_2.12-1.1.0.jar \ + && mv iceberg-spark-runtime-3.3_2.12-1.1.0.jar /opt/spark/jars + +# Download Java AWS SDK +RUN curl -s https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.165/bundle-2.17.165.jar -Lo bundle-2.17.165.jar \ + && mv bundle-2.17.165.jar /opt/spark/jars + +# Download URL connection client required for S3FileIO +RUN curl -s https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.17.165/url-connection-client-2.17.165.jar -Lo url-connection-client-2.17.165.jar \ + && mv url-connection-client-2.17.165.jar /opt/spark/jars + +COPY spark-defaults.conf /opt/spark/conf +ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}" + +RUN chmod u+x /opt/spark/sbin/* && \ + chmod u+x /opt/spark/bin/* + +COPY entrypoint.sh . +COPY provision.py . + +ENTRYPOINT ["./entrypoint.sh"] +CMD ["notebook"] diff --git a/python/dev/docker-compose-integration.yml b/python/dev/docker-compose-integration.yml new file mode 100644 index 000000000000..663303ed698f --- /dev/null +++ b/python/dev/docker-compose-integration.yml @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +version: "3" + +services: + spark-iceberg: + image: python-integration + container_name: pyiceberg-spark + build: . + depends_on: + - rest + - minio + volumes: + - ./warehouse:/home/iceberg/warehouse + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + ports: + - 8888:8888 + - 8080:8080 + links: + - rest:rest + - minio:minio + rest: + image: tabulario/iceberg-rest:0.2.0 + container_name: pyiceberg-rest + ports: + - 8181:8181 + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + - CATALOG_WAREHOUSE=s3a://warehouse/wh/ + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://minio:9000 + minio: + image: minio/minio + container_name: pyiceberg-minio + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + ports: + - 9001:9001 + - 9000:9000 + command: [ "server", "/data", "--console-address", ":9001" ] + mc: + depends_on: + - minio + image: minio/mc + container_name: pyiceberg-mc + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " + until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; + /usr/bin/mc mb minio/warehouse; + /usr/bin/mc policy set public minio/warehouse; + tail -f /dev/null + " diff --git a/python/dev/entrypoint.sh b/python/dev/entrypoint.sh new file mode 100755 index 000000000000..d777f8f5a284 --- /dev/null +++ b/python/dev/entrypoint.sh @@ -0,0 +1,25 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +start-master.sh -p 7077 +start-worker.sh spark://spark-iceberg:7077 +start-history-server.sh + +python3 ./provision.py diff --git a/python/dev/provision.py b/python/dev/provision.py new file mode 100644 index 000000000000..ec84e87a23d4 --- /dev/null +++ b/python/dev/provision.py @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import time + +from pyspark.sql import SparkSession + +spark = SparkSession.builder.getOrCreate() + +print("Create database") + +spark.sql( + """ + CREATE DATABASE IF NOT EXISTS default; +""" +) + +spark.sql( + """ + use default; +""" +) + +spark.sql( + """ + DROP TABLE IF EXISTS test_null_nan; +""" +) + +spark.sql( + """ + CREATE TABLE test_null_nan + USING iceberg + AS SELECT + 1 AS idx, + float('NaN') AS col_numeric +UNION ALL SELECT + 2 AS idx, + null AS col_numeric +UNION ALL SELECT + 3 AS idx, + 1 AS col_numeric +""" +) + +spark.sql( + """ + CREATE TABLE test_null_nan_rewritten + USING iceberg + AS SELECT * FROM test_null_nan +""" +) + +spark.sql( + """ + DROP TABLE IF EXISTS test_deletes; +""" +) + +spark.sql( + """ + CREATE TABLE test_deletes + USING iceberg + TBLPROPERTIES ( + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', + 'write.merge.mode'='merge-on-read' + ) + AS SELECT + 1 AS idx, + True AS deleted +UNION ALL SELECT + 2 AS idx, + False AS deleted; +""" +) + +spark.sql( + """ + DELETE FROM test_deletes WHERE deleted = True; +""" +) + +while True: + time.sleep(1) diff --git a/python/dev/spark-defaults.conf b/python/dev/spark-defaults.conf new file mode 100644 index 000000000000..28f93b15a609 --- /dev/null +++ b/python/dev/spark-defaults.conf @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions +spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.demo.type rest +spark.sql.catalog.demo.uri http://rest:8181 +spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.catalog.demo.warehouse s3a://warehouse/wh/ +spark.sql.catalog.demo.s3.endpoint http://minio:9000 +spark.sql.defaultCatalog demo +spark.eventLog.enabled true +spark.eventLog.dir /home/iceberg/spark-events +spark.history.fs.logDirectory /home/iceberg/spark-events +spark.sql.catalogImplementation in-memory diff --git a/python/pyiceberg/io/pyarrow.py b/python/pyiceberg/io/pyarrow.py index 37bf298b3e32..07b59258dd54 100644 --- a/python/pyiceberg/io/pyarrow.py +++ b/python/pyiceberg/io/pyarrow.py @@ -428,11 +428,11 @@ def visit_not_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> pc def visit_is_nan(self, term: BoundTerm[Any]) -> pc.Expression: ref = pc.field(term.ref().field.name) - return ref.is_null(nan_is_null=True) & ref.is_valid() + return pc.is_nan(ref) def visit_not_nan(self, term: BoundTerm[Any]) -> pc.Expression: ref = pc.field(term.ref().field.name) - return ~(ref.is_null(nan_is_null=True) & ref.is_valid()) + return ~pc.is_nan(ref) def visit_is_null(self, term: BoundTerm[Any]) -> pc.Expression: return pc.field(term.ref().field.name).is_null(nan_is_null=False) diff --git a/python/pyiceberg/table/__init__.py b/python/pyiceberg/table/__init__.py index 69ce08f4576a..89f3af847b5f 100644 --- a/python/pyiceberg/table/__init__.py +++ b/python/pyiceberg/table/__init__.py @@ -95,7 +95,7 @@ def name(self) -> Identifier: def scan( self, row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, - selected_fields: Tuple[str] = ("*",), + selected_fields: Tuple[str, ...] = ("*",), case_sensitive: bool = True, snapshot_id: Optional[int] = None, options: Properties = EMPTY_DICT, @@ -215,7 +215,7 @@ def _parse_row_filter(expr: Union[str, BooleanExpression]) -> BooleanExpression: class TableScan(ABC): table: Table row_filter: BooleanExpression - selected_fields: Tuple[str] + selected_fields: Tuple[str, ...] case_sensitive: bool snapshot_id: Optional[int] options: Properties @@ -224,7 +224,7 @@ def __init__( self, table: Table, row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, - selected_fields: Tuple[str] = ("*",), + selected_fields: Tuple[str, ...] = ("*",), case_sensitive: bool = True, snapshot_id: Optional[int] = None, options: Properties = EMPTY_DICT, @@ -331,7 +331,7 @@ def __init__( self, table: Table, row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, - selected_fields: Tuple[str] = ("*",), + selected_fields: Tuple[str, ...] = ("*",), case_sensitive: bool = True, snapshot_id: Optional[int] = None, options: Properties = EMPTY_DICT, diff --git a/python/pyproject.toml b/python/pyproject.toml index 6746f8a7defe..f65bc14dbb77 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -110,7 +110,8 @@ dynamodb = ["boto3"] [tool.pytest.ini_options] markers = [ "s3: marks a test as requiring access to s3 compliant storage (use with --aws-access-key-id, --aws-secret-access-key, and --endpoint-url args)", - "adlfs: marks a test as requiring access to adlfs compliant storage (use with --adlfs.account-name, --adlfs.account-key, and --adlfs.endpoint args)" + "adlfs: marks a test as requiring access to adlfs compliant storage (use with --adlfs.account-name, --adlfs.account-key, and --adlfs.endpoint args)", + "integration: marks integration tests against Apache Spark" ] [tool.black] @@ -240,5 +241,9 @@ ignore_missing_imports = true module = "pyparsing.*" ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "pyspark.*" +ignore_missing_imports = true + [tool.coverage.run] source = ['pyiceberg/'] diff --git a/python/tests/test_integration.py b/python/tests/test_integration.py new file mode 100644 index 000000000000..3d498f0ec8c6 --- /dev/null +++ b/python/tests/test_integration.py @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name + +import math + +import pytest + +from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.expressions import IsNaN, NotNaN +from pyiceberg.table import Table + + +@pytest.fixture() +def catalog() -> Catalog: + return load_catalog( + "local", + **{ + "type": "rest", + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) + + +@pytest.fixture() +def table_test_null_nan(catalog: Catalog) -> Table: + return catalog.load_table("default.test_null_nan") + + +@pytest.fixture() +def table_test_null_nan_rewritten(catalog: Catalog) -> Table: + return catalog.load_table("default.test_null_nan_rewritten") + + +@pytest.mark.integration +def test_pyarrow_nan(table_test_null_nan: Table) -> None: + arrow_table = table_test_null_nan.scan(row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric")).to_arrow() + assert len(arrow_table) == 1 + assert arrow_table["idx"][0].as_py() == 1 + assert math.isnan(arrow_table["col_numeric"][0].as_py()) + + +@pytest.mark.integration +def test_pyarrow_nan_rewritten(table_test_null_nan_rewritten: Table) -> None: + arrow_table = table_test_null_nan_rewritten.scan( + row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric") + ).to_arrow() + assert len(arrow_table) == 1 + assert arrow_table["idx"][0].as_py() == 1 + assert math.isnan(arrow_table["col_numeric"][0].as_py()) + + +@pytest.mark.integration +@pytest.mark.skip(reason="Fixing issues with NaN's: https://github.com/apache/arrow/issues/34162") +def test_pyarrow_not_nan_count(table_test_null_nan: Table) -> None: + not_nan = table_test_null_nan.scan(row_filter=NotNaN("col_numeric"), selected_fields=("idx",)).to_arrow() + assert len(not_nan) == 2 + + +@pytest.mark.integration +def test_duckdb_nan(table_test_null_nan_rewritten: Table) -> None: + con = table_test_null_nan_rewritten.scan().to_duckdb("table_test_null_nan") + result = con.query("SELECT idx, col_numeric FROM table_test_null_nan WHERE isnan(col_numeric)").fetchone() + assert result[0] == 1 + assert math.isnan(result[1])