From b3ca2b8e6864382defd56522a0cc4f741d0db18a Mon Sep 17 00:00:00 2001 From: Talat Uyarer Date: Sun, 13 Jul 2025 22:29:31 -0700 Subject: [PATCH 1/3] Add glob and single file support to ParquetTable LOCATION --- .../meta/provider/parquet/ParquetTable.java | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java index 88d162c02370..32d4a7907af8 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -31,6 +32,8 @@ import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.parquet.ParquetIO; import org.apache.beam.sdk.io.parquet.ParquetIO.Read; import org.apache.beam.sdk.schemas.transforms.Convert; @@ -57,7 +60,8 @@ class ParquetTable extends SchemaBaseBeamTable implements Serializable { @Override public PCollection buildIOReader(PBegin begin) { final Schema schema = AvroUtils.toAvroSchema(table.getSchema()); - Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*"); + String filePattern = resolveFilePattern(table.getLocation()); + Read read = ParquetIO.read(schema).withBeamSchemas(true).from(filePattern); return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows()); } @@ -65,7 +69,8 @@ public PCollection buildIOReader(PBegin begin) { public PCollection buildIOReader( PBegin begin, BeamSqlTableFilter filters, List fieldNames) { final Schema schema = AvroUtils.toAvroSchema(table.getSchema()); - Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*"); + String filePattern = resolveFilePattern(table.getLocation()); + Read read = ParquetIO.read(schema).withBeamSchemas(true).from(filePattern); if (!fieldNames.isEmpty()) { Schema projectionSchema = projectSchema(schema, fieldNames); LOG.info("Projecting fields schema: {}", projectionSchema); @@ -122,4 +127,27 @@ public IsBounded isBounded() { public ProjectSupport supportsProjects() { return ProjectSupport.WITH_FIELD_REORDERING; } + + private String resolveFilePattern(String location) { + try { + MatchResult match = FileSystems.match(location); + if (match.status() == MatchResult.Status.OK && !match.metadata().isEmpty()) { + MatchResult.Metadata metadata = match.metadata().get(0); + if (metadata.resourceId().isDirectory()) { + String dirPath = metadata.resourceId().toString(); + if (dirPath.endsWith("/")) { + return dirPath + "*"; + } else { + return dirPath + "/*"; + } + } + } + } catch (IOException e) { + LOG.warn( + "Failed to resolve path {}, assuming it is a glob. Error: {}", location, e.getMessage()); + } + // It's a single file, a glob, or a path that couldn't be resolved. + // In all cases, we use the location string directly. + return location; + } } From 393d6430764ab222b22a371f55af5e5c142ec20a Mon Sep 17 00:00:00 2001 From: Talat Uyarer Date: Mon, 14 Jul 2025 13:52:45 -0700 Subject: [PATCH 2/3] not rely on FileSystems.match call to resolveFilePattern. Added test and documenration --- .../meta/provider/parquet/ParquetTable.java | 23 +--- .../parquet/ParquetTableProviderTest.java | 110 +++++++++++++----- .../sql/extensions/create-external-table.md | 99 ++++++++++++++++ 3 files changed, 182 insertions(+), 50 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java index 32d4a7907af8..bdbb48bf1b71 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -32,8 +31,6 @@ import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.io.FileIO; -import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.parquet.ParquetIO; import org.apache.beam.sdk.io.parquet.ParquetIO.Read; import org.apache.beam.sdk.schemas.transforms.Convert; @@ -129,25 +126,9 @@ public ProjectSupport supportsProjects() { } private String resolveFilePattern(String location) { - try { - MatchResult match = FileSystems.match(location); - if (match.status() == MatchResult.Status.OK && !match.metadata().isEmpty()) { - MatchResult.Metadata metadata = match.metadata().get(0); - if (metadata.resourceId().isDirectory()) { - String dirPath = metadata.resourceId().toString(); - if (dirPath.endsWith("/")) { - return dirPath + "*"; - } else { - return dirPath + "/*"; - } - } - } - } catch (IOException e) { - LOG.warn( - "Failed to resolve path {}, assuming it is a glob. Error: {}", location, e.getMessage()); + if (location.endsWith("/")) { + return location + "*"; } - // It's a single file, a glob, or a path that couldn't be resolved. - // In all cases, we use the location string directly. return location; } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java index 71680f706fb5..63197be0a45e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java @@ -18,8 +18,11 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.File; +import java.util.Arrays; +import java.util.List; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; @@ -27,8 +30,10 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -43,25 +48,35 @@ public class ParquetTableProviderTest { @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); private static final String FIELD_NAMES = "(name VARCHAR, age BIGINT, country VARCHAR)"; - private static final Schema TABLE_SCHEMA = Schema.builder() .addStringField("name") .addInt64Field("age") .addStringField("country") .build(); - private static final Schema PROJECTED_SCHEMA = - Schema.builder().addInt64Field("age").addStringField("country").build(); + + private static final Row ROW_1 = + Row.withSchema(TABLE_SCHEMA).addValues("Alan", 22L, "England").build(); + private static final Row ROW_2 = + Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, "USA").build(); + private static final List ALL_ROWS = Arrays.asList(ROW_1, ROW_2); + + private BeamSqlEnv env; + + @Before + public void setUp() { + env = BeamSqlEnv.inMemory(new ParquetTableProvider()); + } @Test - public void testWriteAndReadTable() { - File destinationFile = new File(tempFolder.getRoot(), "person-info/"); + public void testReadAndFilter() { + File destinationDir = new File(tempFolder.getRoot(), "person-info"); + String locationPath = destinationDir.getAbsolutePath() + File.separator; - BeamSqlEnv env = BeamSqlEnv.inMemory(new ParquetTableProvider()); env.executeDdl( String.format( "CREATE EXTERNAL TABLE PersonInfo %s TYPE parquet LOCATION '%s'", - FIELD_NAMES, destinationFile.getAbsolutePath())); + FIELD_NAMES, locationPath)); BeamSqlRelUtils.toPCollection( writePipeline, @@ -69,32 +84,69 @@ public void testWriteAndReadTable() { "INSERT INTO PersonInfo VALUES ('Alan', 22, 'England'), ('John', 42, 'USA')")); writePipeline.run().waitUntilFinish(); - PCollection rows = - BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT * FROM PersonInfo")); - PAssert.that(rows) - .containsInAnyOrder( - Row.withSchema(TABLE_SCHEMA).addValues("Alan", 22L, "England").build(), - Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, "USA").build()); - - PCollection filtered = + Schema projectedSchema = Schema.builder().addStringField("name").addInt64Field("age").build(); + PCollection filteredAndProjected = BeamSqlRelUtils.toPCollection( - readPipeline, env.parseQuery("SELECT * FROM PersonInfo WHERE age > 25")); - PAssert.that(filtered) - .containsInAnyOrder(Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, "USA").build()); + readPipeline, env.parseQuery("SELECT name, age FROM PersonInfo WHERE age > 25")); - PCollection projected = - BeamSqlRelUtils.toPCollection( - readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo")); - PAssert.that(projected) - .containsInAnyOrder( - Row.withSchema(PROJECTED_SCHEMA).addValues(22L, "England").build(), - Row.withSchema(PROJECTED_SCHEMA).addValues(42L, "USA").build()); + PAssert.that(filteredAndProjected) + .containsInAnyOrder(Row.withSchema(projectedSchema).addValues("John", 42L).build()); - PCollection filteredAndProjected = + readPipeline.run().waitUntilFinish(); + } + + @Test + public void testLocationPathConventions() { + File destinationDir = new File(tempFolder.getRoot(), "path-test-data"); + String writeLocation = destinationDir.getAbsolutePath() + File.separator; + env.executeDdl( + String.format( + "CREATE EXTERNAL TABLE TmpWriteTable %s TYPE parquet LOCATION '%s'", + FIELD_NAMES, writeLocation)); + BeamSqlRelUtils.toPCollection( + writePipeline, + env.parseQuery( + "INSERT INTO TmpWriteTable VALUES ('Alan', 22, 'England'), ('John', 42, 'USA')")); + writePipeline.run().waitUntilFinish(); + + env.executeDdl( + String.format( + "CREATE EXTERNAL TABLE DirTable %s TYPE parquet LOCATION '%s'", + FIELD_NAMES, writeLocation)); + PCollection dirResult = + BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT * FROM DirTable")); + PAssert.that("Directory with '/' reads all files", dirResult).containsInAnyOrder(ALL_ROWS); + + String globPath = new File(destinationDir, "output-*").getAbsolutePath(); + env.executeDdl( + String.format( + "CREATE EXTERNAL TABLE GlobTable %s TYPE parquet LOCATION '%s'", + FIELD_NAMES, globPath)); + PCollection globResult = + BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT * FROM GlobTable")); + PAssert.that("Glob 'output-*' reads all files", globResult).containsInAnyOrder(ALL_ROWS); + + File[] writtenFiles = destinationDir.listFiles((dir, name) -> name.startsWith("output-")); + assertTrue( + "Test setup failed: No output files found", + writtenFiles != null && writtenFiles.length > 0); + String singleFilePath = writtenFiles[0].getAbsolutePath(); + + env.executeDdl( + String.format( + "CREATE EXTERNAL TABLE SingleFileTable %s TYPE parquet LOCATION '%s'", + FIELD_NAMES, singleFilePath)); + PCollection singleFileResult = BeamSqlRelUtils.toPCollection( - readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo WHERE age > 25")); - PAssert.that(filteredAndProjected) - .containsInAnyOrder(Row.withSchema(PROJECTED_SCHEMA).addValues(42L, "USA").build()); + readPipeline, env.parseQuery("SELECT * FROM SingleFileTable")); + + PCollection count = singleFileResult.apply(Count.globally()); + PAssert.thatSingleton(count) + .satisfies( + actualCount -> { + assertTrue("Count should be greater than 0", actualCount > 0L); + return null; + }); PipelineResult.State state = readPipeline.run().waitUntilFinish(); assertEquals(State.DONE, state); diff --git a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md index f6b48cdd960e..0b5f14a6ebcd 100644 --- a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md +++ b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md @@ -70,6 +70,7 @@ tableElement: columnName fieldType [ NOT NULL ] * `bigtable` * `pubsub` * `kafka` + * `parquet` * `text` * `location`: The I/O specific location of the underlying table, specified as a [String @@ -549,6 +550,104 @@ Write Mode supports writing to a topic. For CSV only simple types are supported. +## Parquet + +### Syntax + +``` +CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) +TYPE parquet +LOCATION '/path/to/files/' +``` + +* `LOCATION`: The path to the Parquet file(s). The interpretation of the path is based on a convention: + * **Directory:** A path ending with a forward slash (`/`) is treated as a directory. Beam reads all files within that directory. Example: `'gs://my-bucket/orders/'`. + * **Glob Pattern:** A path containing wildcard characters (`*`, `?`, `[]`) is treated as a glob pattern that the underlying file system expands. Example: `'gs://my-bucket/orders/date=2025-*-??/*.parquet'`. + * **Single File:** A full path that does not end in a slash and contains no wildcards is treated as a path to a single file. Example: `'gs://my-bucket/orders/data.parquet'`. + +### Read Mode + +Supports reading from Parquet files specified by the `LOCATION`. Predicate and projection push-down are supported to improve performance. + +### Write Mode + +Supports writing to a set of sharded Parquet files in a specified directory. + +### Schema + +The specified schema is used to read and write Parquet files. The schema is converted to an Avro schema internally for `ParquetIO`. Beam SQL types map to Avro types as follows: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Beam SQL Type + Avro Type +
TINYINT, SMALLINT, INTEGER, BIGINT   + long +
FLOAT, DOUBLE + double +
DECIMAL + bytes (with logical type) +
BOOLEAN + boolean +
DATE, TIME, TIMESTAMP + long (with logical type) +
CHAR, VARCHAR + string +
ARRAY + array +
ROW + record +
+ +### Example + +``` +CREATE EXTERNAL TABLE daily_orders ( + order_id BIGINT, + product_name VARCHAR, + purchase_ts TIMESTAMP +) +TYPE parquet +LOCATION '/gcs/my-data/orders/2025-07-14/*'; +``` + +--- + ## MongoDB ### Syntax From e13dc0818539a16bcc20ff81ee6854d838739850 Mon Sep 17 00:00:00 2001 From: Talat Uyarer Date: Mon, 14 Jul 2025 16:02:36 -0700 Subject: [PATCH 3/3] CHANGES.md update for behaviour change --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 08e7ccfc75fe..0c3c7817523b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -84,6 +84,7 @@ * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). * Go: The pubsubio.Read transform now accepts ReadOptions as a value type instead of a pointer, and requires exactly one of Topic or Subscription to be set (they are mutually exclusive). Additionally, the ReadOptions struct now includes a Topic field for specifying the topic directly, replacing the previous topic parameter in the Read function signature ([#35369])(https://github.com/apache/beam/pull/35369). +* SQL: The `ParquetTable` external table provider has changed its handling of the `LOCATION` property. To read from a directory, the path must now end with a trailing slash (e.g., `LOCATION '/path/to/data/'`). Previously, a trailing slash was not required. This change was made to enable support for glob patterns and single-file paths ([#35582])(https://github.com/apache/beam/pull/35582). ## Deprecations