diff --git a/CHANGES.md b/CHANGES.md index 08e7ccfc75fe..0c3c7817523b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -84,6 +84,7 @@ * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). * Go: The pubsubio.Read transform now accepts ReadOptions as a value type instead of a pointer, and requires exactly one of Topic or Subscription to be set (they are mutually exclusive). Additionally, the ReadOptions struct now includes a Topic field for specifying the topic directly, replacing the previous topic parameter in the Read function signature ([#35369])(https://github.com/apache/beam/pull/35369). +* SQL: The `ParquetTable` external table provider has changed its handling of the `LOCATION` property. To read from a directory, the path must now end with a trailing slash (e.g., `LOCATION '/path/to/data/'`). Previously, a trailing slash was not required. This change was made to enable support for glob patterns and single-file paths ([#35582])(https://github.com/apache/beam/pull/35582). ## Deprecations diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java index 88d162c02370..bdbb48bf1b71 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java @@ -57,7 +57,8 @@ class ParquetTable extends SchemaBaseBeamTable implements Serializable { @Override public PCollection buildIOReader(PBegin begin) { final Schema schema = AvroUtils.toAvroSchema(table.getSchema()); - Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*"); + String filePattern = resolveFilePattern(table.getLocation()); + Read read = ParquetIO.read(schema).withBeamSchemas(true).from(filePattern); return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows()); } @@ -65,7 +66,8 @@ public PCollection buildIOReader(PBegin begin) { public PCollection buildIOReader( PBegin begin, BeamSqlTableFilter filters, List fieldNames) { final Schema schema = AvroUtils.toAvroSchema(table.getSchema()); - Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*"); + String filePattern = resolveFilePattern(table.getLocation()); + Read read = ParquetIO.read(schema).withBeamSchemas(true).from(filePattern); if (!fieldNames.isEmpty()) { Schema projectionSchema = projectSchema(schema, fieldNames); LOG.info("Projecting fields schema: {}", projectionSchema); @@ -122,4 +124,11 @@ public IsBounded isBounded() { public ProjectSupport supportsProjects() { return ProjectSupport.WITH_FIELD_REORDERING; } + + private String resolveFilePattern(String location) { + if (location.endsWith("/")) { + return location + "*"; + } + return location; + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java index 71680f706fb5..63197be0a45e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java @@ -18,8 +18,11 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.File; +import java.util.Arrays; +import java.util.List; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; @@ -27,8 +30,10 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -43,25 +48,35 @@ public class ParquetTableProviderTest { @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); private static final String FIELD_NAMES = "(name VARCHAR, age BIGINT, country VARCHAR)"; - private static final Schema TABLE_SCHEMA = Schema.builder() .addStringField("name") .addInt64Field("age") .addStringField("country") .build(); - private static final Schema PROJECTED_SCHEMA = - Schema.builder().addInt64Field("age").addStringField("country").build(); + + private static final Row ROW_1 = + Row.withSchema(TABLE_SCHEMA).addValues("Alan", 22L, "England").build(); + private static final Row ROW_2 = + Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, "USA").build(); + private static final List ALL_ROWS = Arrays.asList(ROW_1, ROW_2); + + private BeamSqlEnv env; + + @Before + public void setUp() { + env = BeamSqlEnv.inMemory(new ParquetTableProvider()); + } @Test - public void testWriteAndReadTable() { - File destinationFile = new File(tempFolder.getRoot(), "person-info/"); + public void testReadAndFilter() { + File destinationDir = new File(tempFolder.getRoot(), "person-info"); + String locationPath = destinationDir.getAbsolutePath() + File.separator; - BeamSqlEnv env = BeamSqlEnv.inMemory(new ParquetTableProvider()); env.executeDdl( String.format( "CREATE EXTERNAL TABLE PersonInfo %s TYPE parquet LOCATION '%s'", - FIELD_NAMES, destinationFile.getAbsolutePath())); + FIELD_NAMES, locationPath)); BeamSqlRelUtils.toPCollection( writePipeline, @@ -69,32 +84,69 @@ public void testWriteAndReadTable() { "INSERT INTO PersonInfo VALUES ('Alan', 22, 'England'), ('John', 42, 'USA')")); writePipeline.run().waitUntilFinish(); - PCollection rows = - BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT * FROM PersonInfo")); - PAssert.that(rows) - .containsInAnyOrder( - Row.withSchema(TABLE_SCHEMA).addValues("Alan", 22L, "England").build(), - Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, "USA").build()); - - PCollection filtered = + Schema projectedSchema = Schema.builder().addStringField("name").addInt64Field("age").build(); + PCollection filteredAndProjected = BeamSqlRelUtils.toPCollection( - readPipeline, env.parseQuery("SELECT * FROM PersonInfo WHERE age > 25")); - PAssert.that(filtered) - .containsInAnyOrder(Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, "USA").build()); + readPipeline, env.parseQuery("SELECT name, age FROM PersonInfo WHERE age > 25")); - PCollection projected = - BeamSqlRelUtils.toPCollection( - readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo")); - PAssert.that(projected) - .containsInAnyOrder( - Row.withSchema(PROJECTED_SCHEMA).addValues(22L, "England").build(), - Row.withSchema(PROJECTED_SCHEMA).addValues(42L, "USA").build()); + PAssert.that(filteredAndProjected) + .containsInAnyOrder(Row.withSchema(projectedSchema).addValues("John", 42L).build()); - PCollection filteredAndProjected = + readPipeline.run().waitUntilFinish(); + } + + @Test + public void testLocationPathConventions() { + File destinationDir = new File(tempFolder.getRoot(), "path-test-data"); + String writeLocation = destinationDir.getAbsolutePath() + File.separator; + env.executeDdl( + String.format( + "CREATE EXTERNAL TABLE TmpWriteTable %s TYPE parquet LOCATION '%s'", + FIELD_NAMES, writeLocation)); + BeamSqlRelUtils.toPCollection( + writePipeline, + env.parseQuery( + "INSERT INTO TmpWriteTable VALUES ('Alan', 22, 'England'), ('John', 42, 'USA')")); + writePipeline.run().waitUntilFinish(); + + env.executeDdl( + String.format( + "CREATE EXTERNAL TABLE DirTable %s TYPE parquet LOCATION '%s'", + FIELD_NAMES, writeLocation)); + PCollection dirResult = + BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT * FROM DirTable")); + PAssert.that("Directory with '/' reads all files", dirResult).containsInAnyOrder(ALL_ROWS); + + String globPath = new File(destinationDir, "output-*").getAbsolutePath(); + env.executeDdl( + String.format( + "CREATE EXTERNAL TABLE GlobTable %s TYPE parquet LOCATION '%s'", + FIELD_NAMES, globPath)); + PCollection globResult = + BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT * FROM GlobTable")); + PAssert.that("Glob 'output-*' reads all files", globResult).containsInAnyOrder(ALL_ROWS); + + File[] writtenFiles = destinationDir.listFiles((dir, name) -> name.startsWith("output-")); + assertTrue( + "Test setup failed: No output files found", + writtenFiles != null && writtenFiles.length > 0); + String singleFilePath = writtenFiles[0].getAbsolutePath(); + + env.executeDdl( + String.format( + "CREATE EXTERNAL TABLE SingleFileTable %s TYPE parquet LOCATION '%s'", + FIELD_NAMES, singleFilePath)); + PCollection singleFileResult = BeamSqlRelUtils.toPCollection( - readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo WHERE age > 25")); - PAssert.that(filteredAndProjected) - .containsInAnyOrder(Row.withSchema(PROJECTED_SCHEMA).addValues(42L, "USA").build()); + readPipeline, env.parseQuery("SELECT * FROM SingleFileTable")); + + PCollection count = singleFileResult.apply(Count.globally()); + PAssert.thatSingleton(count) + .satisfies( + actualCount -> { + assertTrue("Count should be greater than 0", actualCount > 0L); + return null; + }); PipelineResult.State state = readPipeline.run().waitUntilFinish(); assertEquals(State.DONE, state); diff --git a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md index f6b48cdd960e..0b5f14a6ebcd 100644 --- a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md +++ b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md @@ -70,6 +70,7 @@ tableElement: columnName fieldType [ NOT NULL ] * `bigtable` * `pubsub` * `kafka` + * `parquet` * `text` * `location`: The I/O specific location of the underlying table, specified as a [String @@ -549,6 +550,104 @@ Write Mode supports writing to a topic. For CSV only simple types are supported. +## Parquet + +### Syntax + +``` +CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) +TYPE parquet +LOCATION '/path/to/files/' +``` + +* `LOCATION`: The path to the Parquet file(s). The interpretation of the path is based on a convention: + * **Directory:** A path ending with a forward slash (`/`) is treated as a directory. Beam reads all files within that directory. Example: `'gs://my-bucket/orders/'`. + * **Glob Pattern:** A path containing wildcard characters (`*`, `?`, `[]`) is treated as a glob pattern that the underlying file system expands. Example: `'gs://my-bucket/orders/date=2025-*-??/*.parquet'`. + * **Single File:** A full path that does not end in a slash and contains no wildcards is treated as a path to a single file. Example: `'gs://my-bucket/orders/data.parquet'`. + +### Read Mode + +Supports reading from Parquet files specified by the `LOCATION`. Predicate and projection push-down are supported to improve performance. + +### Write Mode + +Supports writing to a set of sharded Parquet files in a specified directory. + +### Schema + +The specified schema is used to read and write Parquet files. The schema is converted to an Avro schema internally for `ParquetIO`. Beam SQL types map to Avro types as follows: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Beam SQL Type + Avro Type +
TINYINT, SMALLINT, INTEGER, BIGINT   + long +
FLOAT, DOUBLE + double +
DECIMAL + bytes (with logical type) +
BOOLEAN + boolean +
DATE, TIME, TIMESTAMP + long (with logical type) +
CHAR, VARCHAR + string +
ARRAY + array +
ROW + record +
+ +### Example + +``` +CREATE EXTERNAL TABLE daily_orders ( + order_id BIGINT, + product_name VARCHAR, + purchase_ts TIMESTAMP +) +TYPE parquet +LOCATION '/gcs/my-data/orders/2025-07-14/*'; +``` + +--- + ## MongoDB ### Syntax