diff --git a/.github/workflows/java_jni.yml b/.github/workflows/java_jni.yml index 48351f3c22a..10218157a17 100644 --- a/.github/workflows/java_jni.yml +++ b/.github/workflows/java_jni.yml @@ -78,3 +78,36 @@ jobs: if: success() && github.event_name == 'push' && github.repository == 'apache/arrow' continue-on-error: true run: archery docker push debian-java-jni + + docker_integration_python: + name: AMD64 Debian 9 Java C Data Interface Integration + runs-on: ubuntu-latest + if: ${{ !contains(github.event.pull_request.title, 'WIP') }} + timeout-minutes: 90 + steps: + - name: Checkout Arrow + uses: actions/checkout@v2 + with: + fetch-depth: 0 + - name: Fetch Submodules and Tags + run: ci/scripts/util_checkout.sh + - name: Free Up Disk Space + run: ci/scripts/util_cleanup.sh + - name: Cache Docker Volumes + uses: actions/cache@v2 + with: + path: .docker + key: maven-${{ hashFiles('java/**') }} + restore-keys: maven- + - name: Setup Python + uses: actions/setup-python@v1 + with: + python-version: 3.8 + - name: Setup Archery + run: pip install -e dev/archery[docker] + - name: Execute Docker Build + run: archery docker run conda-python-java-integration + - name: Docker Push + if: success() && github.event_name == 'push' && github.repository == 'apache/arrow' + continue-on-error: true + run: archery docker push conda-python-java-integration diff --git a/ci/scripts/java_cdata_integration.sh b/ci/scripts/java_cdata_integration.sh new file mode 100755 index 00000000000..86ea7cf1553 --- /dev/null +++ b/ci/scripts/java_cdata_integration.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -ex + +arrow_dir=${1} +export ARROW_SOURCE_DIR=${arrow_dir} + +pushd ${arrow_dir}/java/c/src/test/python + +python integration_tests.py + +popd diff --git a/docker-compose.yml b/docker-compose.yml index 93314e440a2..a9444a72b2b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -104,6 +104,7 @@ x-hierarchy: - conda-python-dask - conda-python-hdfs - conda-python-jpype + - conda-python-java-integration - conda-python-turbodbc - conda-python-kartothek - conda-python-spark @@ -993,6 +994,45 @@ services: /arrow/ci/scripts/java_build.sh /arrow /build && /arrow/ci/scripts/python_test.sh /arrow"] + conda-python-java-integration: + # Usage: + # docker-compose build conda + # docker-compose build conda-cpp + # docker-compose build conda-python + # docker-compose build conda-python-java-integration + # docker-compose run --rm conda-python-java-integration + image: ${REPO}:${ARCH}-conda-python-${PYTHON}-java-integration + build: + context: . + dockerfile: ci/docker/conda-python-jpype.dockerfile + cache_from: + - ${REPO}:${ARCH}-conda-python-${PYTHON}-java-integration + args: + repo: ${REPO} + arch: ${ARCH} + python: ${PYTHON} + llvm: ${LLVM} + shm_size: *shm-size + environment: + <<: *ccache + ARROW_DATASET: "OFF" + ARROW_FLIGHT: "OFF" + ARROW_GANDIVA: "OFF" + ARROW_JAVA_CDATA: "ON" + ARROW_ORC: "OFF" + ARROW_PARQUET: "OFF" + ARROW_PLASMA: "OFF" + volumes: + - .:/arrow:delegated + - ${DOCKER_VOLUME_PREFIX}maven-cache:/root/.m2:delegated + - ${DOCKER_VOLUME_PREFIX}debian-ccache:/ccache:delegated + command: + [ "/arrow/ci/scripts/cpp_build.sh /arrow /build && + /arrow/ci/scripts/python_build.sh /arrow /build && + /arrow/ci/scripts/java_cdata_build.sh /arrow /build/java/c/build /build/java/c && + /arrow/ci/scripts/java_build.sh /arrow /build && + /arrow/ci/scripts/java_cdata_integration.sh /arrow" ] + conda-python-turbodbc: # Possible $TURBODBC parameters: # - `latest`: latest release diff --git a/java/c/src/test/python/integration_tests.py b/java/c/src/test/python/integration_tests.py new file mode 100644 index 00000000000..c1f130f21d4 --- /dev/null +++ b/java/c/src/test/python/integration_tests.py @@ -0,0 +1,223 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import decimal +import gc +import os +import sys +import unittest +import xml.etree.ElementTree as ET + +import jpype +import pyarrow as pa +from pyarrow.cffi import ffi + + +def setup_jvm(): + # This test requires Arrow Java to be built in the same source tree + try: + arrow_dir = os.environ["ARROW_SOURCE_DIR"] + except KeyError: + arrow_dir = os.path.join(os.path.dirname( + __file__), '..', '..', '..', '..', '..') + pom_path = os.path.join(arrow_dir, 'java', 'pom.xml') + tree = ET.parse(pom_path) + version = tree.getroot().find( + 'POM:version', + namespaces={ + 'POM': 'http://maven.apache.org/POM/4.0.0' + }).text + jar_path = os.path.join( + arrow_dir, 'java', 'tools', 'target', + 'arrow-tools-{}-jar-with-dependencies.jar'.format(version)) + jar_path = os.getenv("ARROW_TOOLS_JAR", jar_path) + jar_path += ":{}".format(os.path.join(arrow_dir, + "java", "c/target/arrow-c-data-{}.jar".format(version))) + kwargs = {} + # This will be the default behaviour in jpype 0.8+ + kwargs['convertStrings'] = False + jpype.startJVM(jpype.getDefaultJVMPath(), "-Djava.class.path=" + jar_path, **kwargs) + + +class Bridge: + def __init__(self): + self.java_allocator = jpype.JPackage( + "org").apache.arrow.memory.RootAllocator(sys.maxsize) + self.java_c = jpype.JPackage("org").apache.arrow.c + + def java_to_python_field(self, jfield): + c_schema = ffi.new("struct ArrowSchema*") + ptr_schema = int(ffi.cast("uintptr_t", c_schema)) + self.java_c.Data.exportField(self.java_allocator, jfield, None, + self.java_c.ArrowSchema.wrap(ptr_schema)) + return pa.Field._import_from_c(ptr_schema) + + def java_to_python_array(self, vector, dictionary_provider=None): + c_schema = ffi.new("struct ArrowSchema*") + ptr_schema = int(ffi.cast("uintptr_t", c_schema)) + c_array = ffi.new("struct ArrowArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + self.java_c.Data.exportVector(self.java_allocator, vector, dictionary_provider, self.java_c.ArrowArray.wrap( + ptr_array), self.java_c.ArrowSchema.wrap(ptr_schema)) + return pa.Array._import_from_c(ptr_array, ptr_schema) + + def java_to_python_record_batch(self, root): + c_schema = ffi.new("struct ArrowSchema*") + ptr_schema = int(ffi.cast("uintptr_t", c_schema)) + c_array = ffi.new("struct ArrowArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + self.java_c.Data.exportVectorSchemaRoot(self.java_allocator, root, None, self.java_c.ArrowArray.wrap( + ptr_array), self.java_c.ArrowSchema.wrap(ptr_schema)) + return pa.RecordBatch._import_from_c(ptr_array, ptr_schema) + + def python_to_java_field(self, field): + c_schema = self.java_c.ArrowSchema.allocateNew(self.java_allocator) + field._export_to_c(c_schema.memoryAddress()) + return self.java_c.Data.importField(self.java_allocator, c_schema, None) + + def python_to_java_array(self, array, dictionary_provider=None): + c_schema = self.java_c.ArrowSchema.allocateNew(self.java_allocator) + c_array = self.java_c.ArrowArray.allocateNew(self.java_allocator) + array._export_to_c(c_array.memoryAddress(), c_schema.memoryAddress()) + return self.java_c.Data.importVector(self.java_allocator, c_array, c_schema, dictionary_provider) + + def python_to_java_record_batch(self, record_batch): + c_schema = self.java_c.ArrowSchema.allocateNew(self.java_allocator) + c_array = self.java_c.ArrowArray.allocateNew(self.java_allocator) + record_batch._export_to_c( + c_array.memoryAddress(), c_schema.memoryAddress()) + return self.java_c.Data.importVectorSchemaRoot(self.java_allocator, c_array, c_schema, None) + + def close(self): + self.java_allocator.close() + + +class TestPythonIntegration(unittest.TestCase): + def setUp(self): + gc.collect() + self.old_allocated_python = pa.total_allocated_bytes() + self.bridge = Bridge() + + def tearDown(self): + self.bridge.close() + gc.collect() + diff_python = pa.total_allocated_bytes() - self.old_allocated_python + self.assertEqual( + pa.total_allocated_bytes(), self.old_allocated_python, + f"PyArrow memory was not adequately released: {diff_python} bytes lost") + + def round_trip_field(self, field_generator): + original_field = field_generator() + java_field = self.bridge.python_to_java_field(original_field) + del original_field + new_field = self.bridge.java_to_python_field(java_field) + del java_field + + expected = field_generator() + self.assertEqual(expected, new_field) + + def round_trip_array(self, array_generator, expected_diff=None): + original_arr = array_generator() + with self.bridge.java_c.CDataDictionaryProvider() as dictionary_provider, \ + self.bridge.python_to_java_array(original_arr, dictionary_provider) as vector: + del original_arr + new_array = self.bridge.java_to_python_array(vector, dictionary_provider) + + expected = array_generator() + if expected_diff: + self.assertEqual(expected, new_array.view(expected.type)) + self.assertEqual(expected.diff(new_array), expected_diff or '') + + def round_trip_record_batch(self, rb_generator): + original_rb = rb_generator() + with self.bridge.python_to_java_record_batch(original_rb) as root: + del original_rb + new_rb = self.bridge.java_to_python_record_batch(root) + + expected = rb_generator() + self.assertEqual(expected, new_rb) + + def test_string_array(self): + self.round_trip_array(lambda: pa.array([None, "a", "bb", "ccc"])) + + def test_decimal_array(self): + data = [ + round(decimal.Decimal(722.82), 2), + round(decimal.Decimal(-934.11), 2), + None, + ] + self.round_trip_array(lambda: pa.array(data, pa.decimal128(5, 2))) + + def test_int_array(self): + self.round_trip_array(lambda: pa.array([1, 2, 3], type=pa.int32())) + + def test_list_array(self): + self.round_trip_array(lambda: pa.array( + [[], [0], [1, 2], [4, 5, 6]], pa.list_(pa.int64()) + ), "# Array types differed: list vs list<$data$: int64>\n") + + def test_struct_array(self): + fields = [ + ("f1", pa.int32()), + ("f2", pa.string()), + ] + data = [ + {"f1": 1, "f2": "a"}, + None, + {"f1": 3, "f2": None}, + {"f1": None, "f2": "d"}, + {"f1": None, "f2": None}, + ] + self.round_trip_array(lambda: pa.array(data, type=pa.struct(fields))) + + def test_dict(self): + self.round_trip_array( + lambda: pa.array(["a", "b", None, "d"], pa.dictionary(pa.int64(), pa.utf8()))) + + def test_map(self): + offsets = [0, None, 2, 6] + pykeys = [b"a", b"b", b"c", b"d", b"e", b"f"] + pyitems = [1, 2, 3, None, 4, 5] + keys = pa.array(pykeys, type="binary") + items = pa.array(pyitems, type="i4") + self.round_trip_array( + lambda: pa.MapArray.from_arrays(offsets, keys, items)) + + def test_field(self): + self.round_trip_field(lambda: pa.field("aa", pa.bool_())) + + def test_field_nested(self): + self.round_trip_field(lambda: pa.field( + "test", pa.list_(pa.int32()), nullable=True)) + + def test_field_metadata(self): + self.round_trip_field(lambda: pa.field("aa", pa.bool_(), {"a": "b"})) + + def test_record_batch_with_list(self): + data = [ + pa.array([[1], [2], [3], [4, 5, 6]]), + pa.array([1, 2, 3, 4]), + pa.array(['foo', 'bar', 'baz', None]), + pa.array([True, None, False, True]) + ] + self.round_trip_record_batch( + lambda: pa.RecordBatch.from_arrays(data, ['f0', 'f1', 'f2', 'f3'])) + + +if __name__ == '__main__': + setup_jvm() + unittest.main(verbosity=2)