From 7498cfb04ca07fffec1cc447a2cf35f538e5eae9 Mon Sep 17 00:00:00 2001 From: Akarys Shorabek Date: Mon, 23 Jun 2025 17:31:37 +0500 Subject: [PATCH 1/3] Revert #35243 --- .../trigger_files/beam_PostCommit_Python.json | 2 +- .../io/external/xlang_jdbcio_it_test.py | 10 +++++ sdks/python/apache_beam/io/jdbc.py | 6 ++- sdks/python/apache_beam/typehints/schemas.py | 12 +----- sdks/python/apache_beam/yaml/main.py | 39 +++++++++++++------ 5 files changed, 45 insertions(+), 24 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 62905b12a707..023697768983 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 13 + "modification": 12 } diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py index 9f90a44d9a00..9aed0d5f11d5 100644 --- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py @@ -36,6 +36,8 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.typehints.schemas import LogicalType +from apache_beam.typehints.schemas import MillisInstant from apache_beam.utils.timestamp import Timestamp # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports @@ -253,6 +255,10 @@ def test_xlang_jdbc_write_read(self, database): classpath=config['classpath'], )) + # Register MillisInstant logical type to override the mapping from Timestamp + # originally handled by MicrosInstant. + LogicalType.register_logical_type(MillisInstant) + with TestPipeline() as p: p.not_use_test_runner_api = True result = ( @@ -349,6 +355,10 @@ def custom_row_equals(expected, actual): classpath=config['classpath'], )) + # Register MillisInstant logical type to override the mapping from Timestamp + # originally handled by MicrosInstant. + LogicalType.register_logical_type(MillisInstant) + # Run read pipeline with custom schema with TestPipeline() as p: p.not_use_test_runner_api = True diff --git a/sdks/python/apache_beam/io/jdbc.py b/sdks/python/apache_beam/io/jdbc.py index 604b95f6eebe..32ce16b358f8 100644 --- a/sdks/python/apache_beam/io/jdbc.py +++ b/sdks/python/apache_beam/io/jdbc.py @@ -401,7 +401,7 @@ def __init__(self, argument=""): @classmethod def representation_type(cls) -> type: - return MillisInstant + return Timestamp @classmethod def urn(cls): @@ -417,6 +417,7 @@ def to_representation_type(self, value: datetime.date) -> Timestamp: value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc)) def to_language_type(self, value: Timestamp) -> datetime.date: + return value.to_utc_datetime().date() @classmethod @@ -444,7 +445,7 @@ def __init__(self, argument=""): @classmethod def representation_type(cls) -> type: - return MillisInstant + return Timestamp @classmethod def urn(cls): @@ -462,6 +463,7 @@ def to_representation_type(self, value: datetime.date) -> Timestamp: tzinfo=datetime.timezone.utc)) def to_language_type(self, value: Timestamp) -> datetime.date: + return value.to_utc_datetime().time() @classmethod diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 90a692e21125..de4cdb9fdf75 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -335,10 +335,7 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType: array_type=schema_pb2.ArrayType(element_type=element_type)) try: - if LogicalType.is_known_logical_type(type_): - logical_type = type_ - else: - logical_type = LogicalType.from_typing(type_) + logical_type = LogicalType.from_typing(type_) except ValueError: # Unknown type, just treat it like Any return schema_pb2.FieldType( @@ -672,7 +669,7 @@ def add(self, urn, logical_type): def get_logical_type_by_urn(self, urn): return self.by_urn.get(urn, None) - def get_urn_by_logical_type(self, logical_type): + def get_urn_by_logial_type(self, logical_type): return self.by_logical_type.get(logical_type, None) def get_logical_type_by_language_type(self, representation_type): @@ -811,11 +808,6 @@ def from_runner_api(cls, logical_type_proto): return logical_type() return logical_type(argument) - @classmethod - def is_known_logical_type(cls, logical_type): - return cls._known_logical_types.get_urn_by_logical_type( - logical_type) is not None - class NoArgumentLogicalType(LogicalType[LanguageT, RepresentationT, None]): @classmethod diff --git a/sdks/python/apache_beam/yaml/main.py b/sdks/python/apache_beam/yaml/main.py index fbebaea4346f..35879514ce15 100644 --- a/sdks/python/apache_beam/yaml/main.py +++ b/sdks/python/apache_beam/yaml/main.py @@ -16,6 +16,7 @@ # import argparse +import contextlib import json import os import sys @@ -26,6 +27,8 @@ import apache_beam as beam from apache_beam.io.filesystems import FileSystems from apache_beam.transforms import resources +from apache_beam.typehints.schemas import LogicalType +from apache_beam.typehints.schemas import MillisInstant from apache_beam.yaml import yaml_testing from apache_beam.yaml import yaml_transform from apache_beam.yaml import yaml_utils @@ -133,12 +136,25 @@ def _pipeline_spec_from_args(known_args): return pipeline_yaml +@contextlib.contextmanager +def _fix_xlang_instant_coding(): + # Scoped workaround for https://github.com/apache/beam/issues/28151. + old_registry = LogicalType._known_logical_types + LogicalType._known_logical_types = old_registry.copy() + try: + LogicalType.register_logical_type(MillisInstant) + yield + finally: + LogicalType._known_logical_types = old_registry + + def run(argv=None): options, constructor, display_data = build_pipeline_components_from_argv(argv) - with beam.Pipeline(options=options, display_data=display_data) as p: - print('Building pipeline...') - constructor(p) - print('Running pipeline...') + with _fix_xlang_instant_coding(): + with beam.Pipeline(options=options, display_data=display_data) as p: + print('Building pipeline...') + constructor(p) + print('Running pipeline...') def run_tests(argv=None, exit=True): @@ -169,13 +185,14 @@ def run_tests(argv=None, exit=True): "If you haven't added a set of tests yet, you can get started by " 'running your pipeline with the --create_test flag enabled.') - tests = [ - yaml_testing.YamlTestCase( - pipeline_spec, test_spec, options, known_args.fix_tests) - for test_spec in test_specs - ] - suite = unittest.TestSuite(tests) - result = unittest.TextTestRunner().run(suite) + with _fix_xlang_instant_coding(): + tests = [ + yaml_testing.YamlTestCase( + pipeline_spec, test_spec, options, known_args.fix_tests) + for test_spec in test_specs + ] + suite = unittest.TestSuite(tests) + result = unittest.TextTestRunner().run(suite) if known_args.fix_tests or known_args.create_test: update_tests(known_args, pipeline_yaml, pipeline_spec, options, tests) From abddb4877c87cbfe1aece42e1bc739f8be571f62 Mon Sep 17 00:00:00 2001 From: Akarys Shorabek Date: Mon, 23 Jun 2025 17:33:09 +0500 Subject: [PATCH 2/3] Revert #35191 --- .../beam_PostCommit_Yaml_Xlang_Direct.yml | 10 +++---- .../{databases => }/bigquery.yaml | 0 .../extended_tests/{data => }/enrichment.yaml | 0 .../extended_tests/{databases => }/jdbc.yaml | 0 .../extended_tests/{messaging => }/kafka.yaml | 0 .../extended_tests/{databases => }/mysql.yaml | 0 .../{databases => }/oracle.yaml | 0 .../{databases => }/postgres.yaml | 0 .../{messaging => }/pubsub.yaml | 0 .../{databases => }/spanner.yaml | 0 .../{databases => }/sqlserver.yaml | 0 .../extended_tests/{data => }/tfrecord.yaml | 0 .../apache_beam/yaml/integration_tests.py | 12 ++++---- sdks/python/build.gradle | 29 ++++--------------- sdks/python/setup.py | 3 +- 15 files changed, 16 insertions(+), 38 deletions(-) rename sdks/python/apache_beam/yaml/extended_tests/{databases => }/bigquery.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{data => }/enrichment.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{databases => }/jdbc.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{messaging => }/kafka.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{databases => }/mysql.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{databases => }/oracle.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{databases => }/postgres.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{messaging => }/pubsub.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{databases => }/spanner.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{databases => }/sqlserver.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{data => }/tfrecord.yaml (100%) diff --git a/.github/workflows/beam_PostCommit_Yaml_Xlang_Direct.yml b/.github/workflows/beam_PostCommit_Yaml_Xlang_Direct.yml index 9215aba0f1de..8ec3c2bc7aaf 100644 --- a/.github/workflows/beam_PostCommit_Yaml_Xlang_Direct.yml +++ b/.github/workflows/beam_PostCommit_Yaml_Xlang_Direct.yml @@ -61,15 +61,14 @@ jobs: matrix: job_name: ["beam_PostCommit_Yaml_Xlang_Direct"] job_phrase: ["Run Yaml_Xlang_Direct PostCommit"] - test_set: ["data", "databases", "messaging"] steps: - uses: actions/checkout@v4 - name: Setup repository uses: ./.github/actions/setup-action with: - comment_phrase: ${{ matrix.job_phrase }} ${{ matrix.test_set }} + comment_phrase: ${{ matrix.job_phrase }} github_token: ${{ secrets.GITHUB_TOKEN }} - github_job: ${{ matrix.job_name }} ${{ matrix.test_set }} (${{ matrix.job_phrase }} ${{ matrix.test_set }}) + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) - name: Setup environment uses: ./.github/actions/setup-environment-action with: @@ -80,7 +79,7 @@ jobs: - name: run PostCommit Yaml Xlang Direct script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :sdks:python:postCommitYamlIntegrationTests -PyamlTestSet=${{ matrix.test_set }} -PbeamPythonExtra=ml_test + gradle-command: :sdks:python:postCommitYamlIntegrationTests -PbeamPythonExtra=ml_test - name: Archive Python Test Results uses: actions/upload-artifact@v4 if: failure() @@ -94,5 +93,4 @@ jobs: commit: '${{ env.prsha || env.GITHUB_SHA }}' comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} files: '**/pytest*.xml' - large_files: true - + large_files: true \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery.yaml b/sdks/python/apache_beam/yaml/extended_tests/bigquery.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/databases/bigquery.yaml rename to sdks/python/apache_beam/yaml/extended_tests/bigquery.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/data/enrichment.yaml b/sdks/python/apache_beam/yaml/extended_tests/enrichment.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/data/enrichment.yaml rename to sdks/python/apache_beam/yaml/extended_tests/enrichment.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/jdbc.yaml b/sdks/python/apache_beam/yaml/extended_tests/jdbc.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/databases/jdbc.yaml rename to sdks/python/apache_beam/yaml/extended_tests/jdbc.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/messaging/kafka.yaml b/sdks/python/apache_beam/yaml/extended_tests/kafka.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/messaging/kafka.yaml rename to sdks/python/apache_beam/yaml/extended_tests/kafka.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/mysql.yaml b/sdks/python/apache_beam/yaml/extended_tests/mysql.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/databases/mysql.yaml rename to sdks/python/apache_beam/yaml/extended_tests/mysql.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/oracle.yaml b/sdks/python/apache_beam/yaml/extended_tests/oracle.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/databases/oracle.yaml rename to sdks/python/apache_beam/yaml/extended_tests/oracle.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/postgres.yaml b/sdks/python/apache_beam/yaml/extended_tests/postgres.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/databases/postgres.yaml rename to sdks/python/apache_beam/yaml/extended_tests/postgres.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/messaging/pubsub.yaml b/sdks/python/apache_beam/yaml/extended_tests/pubsub.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/messaging/pubsub.yaml rename to sdks/python/apache_beam/yaml/extended_tests/pubsub.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/spanner.yaml b/sdks/python/apache_beam/yaml/extended_tests/spanner.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/databases/spanner.yaml rename to sdks/python/apache_beam/yaml/extended_tests/spanner.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/sqlserver.yaml b/sdks/python/apache_beam/yaml/extended_tests/sqlserver.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/databases/sqlserver.yaml rename to sdks/python/apache_beam/yaml/extended_tests/sqlserver.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/data/tfrecord.yaml b/sdks/python/apache_beam/yaml/extended_tests/tfrecord.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/data/tfrecord.yaml rename to sdks/python/apache_beam/yaml/extended_tests/tfrecord.yaml diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 9a1c4478a587..9036e6a3d5c4 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -228,7 +228,7 @@ def temp_mysql_database(): with the MySQL database during setup. Exception: Any other exception encountered during the setup process. """ - with MySqlContainer(init=True) as mysql_container: + with MySqlContainer() as mysql_container: try: # Make connection to temp database and create tmp table engine = sqlalchemy.create_engine(mysql_container.get_connection_url()) @@ -440,12 +440,12 @@ def temp_kafka_server(): Exception: If there's an error starting the Kafka container or interacting with the temporary Kafka server. """ - with KafkaContainer() as kafka_container: - try: + try: + with KafkaContainer() as kafka_container: yield kafka_container.get_bootstrap_server() - except Exception as err: - logging.error("Error interacting with temporary Kakfa Server: %s", err) - raise err + except Exception as err: + logging.error("Error interacting with temporary Kakfa Server: %s", err) + raise err @contextlib.contextmanager diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle index c8f02262d3fd..accacb1953bb 100644 --- a/sdks/python/build.gradle +++ b/sdks/python/build.gradle @@ -138,13 +138,13 @@ tasks.register("yamlIntegrationTests") { doLast { exec { executable 'sh' - args '-c', "${envdir}/bin/pytest -v apache_beam/yaml/integration_tests.py" + args '-c', "${envdir}/bin/pytest -v apache_beam/yaml/integration_tests.py --deselect apache_beam/yaml/integration_tests.py::BigqueryTest::test_ReadFromBigQuery_ExternalJavaProvider_0" } } } tasks.register("postCommitYamlIntegrationTests") { - description "Runs postcommit integration tests for yaml pipelines - parameterized by yamlTestSet." + description "Runs postcommit integration tests for yaml pipelines." dependsOn installGcpTest // Need to build all expansion services referenced in apache_beam/yaml/*.* @@ -155,28 +155,9 @@ tasks.register("postCommitYamlIntegrationTests") { dependsOn ":sdks:java:io:google-cloud-platform:expansion-service:build" doLast { - def testSetInput = project.findProperty('yamlTestSet') ?: 'data,databases,messaging' - def testSetsToRun = testSetInput.tokenize(',').collect { it.trim() }.findAll { !it.isEmpty() } - testSetsToRun.each { currentTestSet -> - def test_files_dir - - switch (currentTestSet) { - case 'data': - test_files_dir = 'extended_tests/data' - break - case 'databases': - test_files_dir = 'extended_tests/databases' - break - case 'messaging': - test_files_dir = 'extended_tests/messaging' - break - default: - throw StopExecutionException("Unknown yamlTestSet: ${testSet}. Must be one of 'data', 'databases', or 'messaging'.") - } - exec { - executable 'sh' - args '-c', "${envdir}/bin/pytest -v apache_beam/yaml/integration_tests.py --deselect apache_beam/yaml/integration_tests.py::BigqueryTest::test_ReadFromBigQuery_ExternalJavaProvider_0 --test_files_dir='${test_files_dir}'" - } + exec { + executable 'sh' + args '-c', "${envdir}/bin/pytest -v apache_beam/yaml/integration_tests.py --deselect apache_beam/yaml/integration_tests.py::BigqueryTest::test_ReadFromBigQuery_ExternalJavaProvider_0 --test_files_dir='extended_tests'" } } } diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 6c799ba943bb..7a1def746d5c 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -444,8 +444,7 @@ def get_portability_package_data(): 'virtualenv-clone>=0.5,<1.0', 'mysql-connector-python>=9.3.0', 'python-tds>=1.16.1', - 'sqlalchemy-pytds>=1.0.2', - 'oracledb>=3.1.1' + 'sqlalchemy-pytds>=1.0.2' ], 'gcp': [ 'cachetools>=3.1.0,<6', From ad309b711eae5605a05825a328221d896f93ae35 Mon Sep 17 00:00:00 2001 From: Akarys Shorabek Date: Tue, 24 Jun 2025 16:18:49 +0500 Subject: [PATCH 3/3] Revert abddb4877c --- .../beam_PostCommit_Yaml_Xlang_Direct.yml | 10 ++++--- .../extended_tests/{ => data}/enrichment.yaml | 0 .../extended_tests/{ => data}/tfrecord.yaml | 0 .../{ => databases}/bigquery.yaml | 0 .../extended_tests/{ => databases}/jdbc.yaml | 0 .../extended_tests/{ => databases}/mysql.yaml | 0 .../{ => databases}/oracle.yaml | 0 .../{ => databases}/postgres.yaml | 0 .../{ => databases}/spanner.yaml | 0 .../{ => databases}/sqlserver.yaml | 0 .../extended_tests/{ => messaging}/kafka.yaml | 0 .../{ => messaging}/pubsub.yaml | 0 .../apache_beam/yaml/integration_tests.py | 12 ++++---- sdks/python/build.gradle | 29 +++++++++++++++---- sdks/python/setup.py | 3 +- 15 files changed, 38 insertions(+), 16 deletions(-) rename sdks/python/apache_beam/yaml/extended_tests/{ => data}/enrichment.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{ => data}/tfrecord.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{ => databases}/bigquery.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{ => databases}/jdbc.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{ => databases}/mysql.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{ => databases}/oracle.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{ => databases}/postgres.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{ => databases}/spanner.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{ => databases}/sqlserver.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{ => messaging}/kafka.yaml (100%) rename sdks/python/apache_beam/yaml/extended_tests/{ => messaging}/pubsub.yaml (100%) diff --git a/.github/workflows/beam_PostCommit_Yaml_Xlang_Direct.yml b/.github/workflows/beam_PostCommit_Yaml_Xlang_Direct.yml index 8ec3c2bc7aaf..9215aba0f1de 100644 --- a/.github/workflows/beam_PostCommit_Yaml_Xlang_Direct.yml +++ b/.github/workflows/beam_PostCommit_Yaml_Xlang_Direct.yml @@ -61,14 +61,15 @@ jobs: matrix: job_name: ["beam_PostCommit_Yaml_Xlang_Direct"] job_phrase: ["Run Yaml_Xlang_Direct PostCommit"] + test_set: ["data", "databases", "messaging"] steps: - uses: actions/checkout@v4 - name: Setup repository uses: ./.github/actions/setup-action with: - comment_phrase: ${{ matrix.job_phrase }} + comment_phrase: ${{ matrix.job_phrase }} ${{ matrix.test_set }} github_token: ${{ secrets.GITHUB_TOKEN }} - github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + github_job: ${{ matrix.job_name }} ${{ matrix.test_set }} (${{ matrix.job_phrase }} ${{ matrix.test_set }}) - name: Setup environment uses: ./.github/actions/setup-environment-action with: @@ -79,7 +80,7 @@ jobs: - name: run PostCommit Yaml Xlang Direct script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :sdks:python:postCommitYamlIntegrationTests -PbeamPythonExtra=ml_test + gradle-command: :sdks:python:postCommitYamlIntegrationTests -PyamlTestSet=${{ matrix.test_set }} -PbeamPythonExtra=ml_test - name: Archive Python Test Results uses: actions/upload-artifact@v4 if: failure() @@ -93,4 +94,5 @@ jobs: commit: '${{ env.prsha || env.GITHUB_SHA }}' comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} files: '**/pytest*.xml' - large_files: true \ No newline at end of file + large_files: true + diff --git a/sdks/python/apache_beam/yaml/extended_tests/enrichment.yaml b/sdks/python/apache_beam/yaml/extended_tests/data/enrichment.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/enrichment.yaml rename to sdks/python/apache_beam/yaml/extended_tests/data/enrichment.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/tfrecord.yaml b/sdks/python/apache_beam/yaml/extended_tests/data/tfrecord.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/tfrecord.yaml rename to sdks/python/apache_beam/yaml/extended_tests/data/tfrecord.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/bigquery.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/bigquery.yaml rename to sdks/python/apache_beam/yaml/extended_tests/databases/bigquery.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/jdbc.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/jdbc.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/jdbc.yaml rename to sdks/python/apache_beam/yaml/extended_tests/databases/jdbc.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/mysql.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/mysql.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/mysql.yaml rename to sdks/python/apache_beam/yaml/extended_tests/databases/mysql.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/oracle.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/oracle.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/oracle.yaml rename to sdks/python/apache_beam/yaml/extended_tests/databases/oracle.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/postgres.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/postgres.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/postgres.yaml rename to sdks/python/apache_beam/yaml/extended_tests/databases/postgres.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/spanner.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/spanner.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/spanner.yaml rename to sdks/python/apache_beam/yaml/extended_tests/databases/spanner.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/sqlserver.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/sqlserver.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/sqlserver.yaml rename to sdks/python/apache_beam/yaml/extended_tests/databases/sqlserver.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/kafka.yaml b/sdks/python/apache_beam/yaml/extended_tests/messaging/kafka.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/kafka.yaml rename to sdks/python/apache_beam/yaml/extended_tests/messaging/kafka.yaml diff --git a/sdks/python/apache_beam/yaml/extended_tests/pubsub.yaml b/sdks/python/apache_beam/yaml/extended_tests/messaging/pubsub.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/extended_tests/pubsub.yaml rename to sdks/python/apache_beam/yaml/extended_tests/messaging/pubsub.yaml diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 9036e6a3d5c4..9a1c4478a587 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -228,7 +228,7 @@ def temp_mysql_database(): with the MySQL database during setup. Exception: Any other exception encountered during the setup process. """ - with MySqlContainer() as mysql_container: + with MySqlContainer(init=True) as mysql_container: try: # Make connection to temp database and create tmp table engine = sqlalchemy.create_engine(mysql_container.get_connection_url()) @@ -440,12 +440,12 @@ def temp_kafka_server(): Exception: If there's an error starting the Kafka container or interacting with the temporary Kafka server. """ - try: - with KafkaContainer() as kafka_container: + with KafkaContainer() as kafka_container: + try: yield kafka_container.get_bootstrap_server() - except Exception as err: - logging.error("Error interacting with temporary Kakfa Server: %s", err) - raise err + except Exception as err: + logging.error("Error interacting with temporary Kakfa Server: %s", err) + raise err @contextlib.contextmanager diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle index accacb1953bb..c8f02262d3fd 100644 --- a/sdks/python/build.gradle +++ b/sdks/python/build.gradle @@ -138,13 +138,13 @@ tasks.register("yamlIntegrationTests") { doLast { exec { executable 'sh' - args '-c', "${envdir}/bin/pytest -v apache_beam/yaml/integration_tests.py --deselect apache_beam/yaml/integration_tests.py::BigqueryTest::test_ReadFromBigQuery_ExternalJavaProvider_0" + args '-c', "${envdir}/bin/pytest -v apache_beam/yaml/integration_tests.py" } } } tasks.register("postCommitYamlIntegrationTests") { - description "Runs postcommit integration tests for yaml pipelines." + description "Runs postcommit integration tests for yaml pipelines - parameterized by yamlTestSet." dependsOn installGcpTest // Need to build all expansion services referenced in apache_beam/yaml/*.* @@ -155,9 +155,28 @@ tasks.register("postCommitYamlIntegrationTests") { dependsOn ":sdks:java:io:google-cloud-platform:expansion-service:build" doLast { - exec { - executable 'sh' - args '-c', "${envdir}/bin/pytest -v apache_beam/yaml/integration_tests.py --deselect apache_beam/yaml/integration_tests.py::BigqueryTest::test_ReadFromBigQuery_ExternalJavaProvider_0 --test_files_dir='extended_tests'" + def testSetInput = project.findProperty('yamlTestSet') ?: 'data,databases,messaging' + def testSetsToRun = testSetInput.tokenize(',').collect { it.trim() }.findAll { !it.isEmpty() } + testSetsToRun.each { currentTestSet -> + def test_files_dir + + switch (currentTestSet) { + case 'data': + test_files_dir = 'extended_tests/data' + break + case 'databases': + test_files_dir = 'extended_tests/databases' + break + case 'messaging': + test_files_dir = 'extended_tests/messaging' + break + default: + throw StopExecutionException("Unknown yamlTestSet: ${testSet}. Must be one of 'data', 'databases', or 'messaging'.") + } + exec { + executable 'sh' + args '-c', "${envdir}/bin/pytest -v apache_beam/yaml/integration_tests.py --deselect apache_beam/yaml/integration_tests.py::BigqueryTest::test_ReadFromBigQuery_ExternalJavaProvider_0 --test_files_dir='${test_files_dir}'" + } } } } diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 7a1def746d5c..6c799ba943bb 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -444,7 +444,8 @@ def get_portability_package_data(): 'virtualenv-clone>=0.5,<1.0', 'mysql-connector-python>=9.3.0', 'python-tds>=1.16.1', - 'sqlalchemy-pytds>=1.0.2' + 'sqlalchemy-pytds>=1.0.2', + 'oracledb>=3.1.1' ], 'gcp': [ 'cachetools>=3.1.0,<6',