Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* Go: The pubsubio.Read transform now accepts ReadOptions as a value type instead of a pointer, and requires exactly one of Topic or Subscription to be set (they are mutually exclusive). Additionally, the ReadOptions struct now includes a Topic field for specifying the topic directly, replacing the previous topic parameter in the Read function signature ([#35369])(https://github.com/apache/beam/pull/35369).
* SQL: The `ParquetTable` external table provider has changed its handling of the `LOCATION` property. To read from a directory, the path must now end with a trailing slash (e.g., `LOCATION '/path/to/data/'`). Previously, a trailing slash was not required. This change was made to enable support for glob patterns and single-file paths ([#35582])(https://github.com/apache/beam/pull/35582).

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,17 @@ class ParquetTable extends SchemaBaseBeamTable implements Serializable {
@Override
public PCollection<Row> buildIOReader(PBegin begin) {
final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
String filePattern = resolveFilePattern(table.getLocation());
Read read = ParquetIO.read(schema).withBeamSchemas(true).from(filePattern);
return begin.apply("ParquetIORead", read).apply("ToRows", Convert.toRows());
}

@Override
public PCollection<Row> buildIOReader(
PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
Read read = ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
String filePattern = resolveFilePattern(table.getLocation());
Read read = ParquetIO.read(schema).withBeamSchemas(true).from(filePattern);
if (!fieldNames.isEmpty()) {
Schema projectionSchema = projectSchema(schema, fieldNames);
LOG.info("Projecting fields schema: {}", projectionSchema);
Expand Down Expand Up @@ -122,4 +124,11 @@ public IsBounded isBounded() {
public ProjectSupport supportsProjects() {
return ProjectSupport.WITH_FIELD_REORDERING;
}

private String resolveFilePattern(String location) {
if (location.endsWith("/")) {
return location + "*";
}
return location;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@
package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand All @@ -43,58 +48,105 @@ public class ParquetTableProviderTest {
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();

private static final String FIELD_NAMES = "(name VARCHAR, age BIGINT, country VARCHAR)";

private static final Schema TABLE_SCHEMA =
Schema.builder()
.addStringField("name")
.addInt64Field("age")
.addStringField("country")
.build();
private static final Schema PROJECTED_SCHEMA =
Schema.builder().addInt64Field("age").addStringField("country").build();

private static final Row ROW_1 =
Row.withSchema(TABLE_SCHEMA).addValues("Alan", 22L, "England").build();
private static final Row ROW_2 =
Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, "USA").build();
private static final List<Row> ALL_ROWS = Arrays.asList(ROW_1, ROW_2);

private BeamSqlEnv env;

@Before
public void setUp() {
env = BeamSqlEnv.inMemory(new ParquetTableProvider());
}

@Test
public void testWriteAndReadTable() {
File destinationFile = new File(tempFolder.getRoot(), "person-info/");
public void testReadAndFilter() {
File destinationDir = new File(tempFolder.getRoot(), "person-info");
String locationPath = destinationDir.getAbsolutePath() + File.separator;

BeamSqlEnv env = BeamSqlEnv.inMemory(new ParquetTableProvider());
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE PersonInfo %s TYPE parquet LOCATION '%s'",
FIELD_NAMES, destinationFile.getAbsolutePath()));
FIELD_NAMES, locationPath));

BeamSqlRelUtils.toPCollection(
writePipeline,
env.parseQuery(
"INSERT INTO PersonInfo VALUES ('Alan', 22, 'England'), ('John', 42, 'USA')"));
writePipeline.run().waitUntilFinish();

PCollection<Row> rows =
BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT * FROM PersonInfo"));
PAssert.that(rows)
.containsInAnyOrder(
Row.withSchema(TABLE_SCHEMA).addValues("Alan", 22L, "England").build(),
Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, "USA").build());

PCollection<Row> filtered =
Schema projectedSchema = Schema.builder().addStringField("name").addInt64Field("age").build();
PCollection<Row> filteredAndProjected =
BeamSqlRelUtils.toPCollection(
readPipeline, env.parseQuery("SELECT * FROM PersonInfo WHERE age > 25"));
PAssert.that(filtered)
.containsInAnyOrder(Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, "USA").build());
readPipeline, env.parseQuery("SELECT name, age FROM PersonInfo WHERE age > 25"));

PCollection<Row> projected =
BeamSqlRelUtils.toPCollection(
readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo"));
PAssert.that(projected)
.containsInAnyOrder(
Row.withSchema(PROJECTED_SCHEMA).addValues(22L, "England").build(),
Row.withSchema(PROJECTED_SCHEMA).addValues(42L, "USA").build());
PAssert.that(filteredAndProjected)
.containsInAnyOrder(Row.withSchema(projectedSchema).addValues("John", 42L).build());

PCollection<Row> filteredAndProjected =
readPipeline.run().waitUntilFinish();
}

@Test
public void testLocationPathConventions() {
File destinationDir = new File(tempFolder.getRoot(), "path-test-data");
String writeLocation = destinationDir.getAbsolutePath() + File.separator;
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE TmpWriteTable %s TYPE parquet LOCATION '%s'",
FIELD_NAMES, writeLocation));
BeamSqlRelUtils.toPCollection(
writePipeline,
env.parseQuery(
"INSERT INTO TmpWriteTable VALUES ('Alan', 22, 'England'), ('John', 42, 'USA')"));
writePipeline.run().waitUntilFinish();

env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE DirTable %s TYPE parquet LOCATION '%s'",
FIELD_NAMES, writeLocation));
PCollection<Row> dirResult =
BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT * FROM DirTable"));
PAssert.that("Directory with '/' reads all files", dirResult).containsInAnyOrder(ALL_ROWS);

String globPath = new File(destinationDir, "output-*").getAbsolutePath();
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE GlobTable %s TYPE parquet LOCATION '%s'",
FIELD_NAMES, globPath));
PCollection<Row> globResult =
BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT * FROM GlobTable"));
PAssert.that("Glob 'output-*' reads all files", globResult).containsInAnyOrder(ALL_ROWS);

File[] writtenFiles = destinationDir.listFiles((dir, name) -> name.startsWith("output-"));
assertTrue(
"Test setup failed: No output files found",
writtenFiles != null && writtenFiles.length > 0);
String singleFilePath = writtenFiles[0].getAbsolutePath();

env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE SingleFileTable %s TYPE parquet LOCATION '%s'",
FIELD_NAMES, singleFilePath));
PCollection<Row> singleFileResult =
BeamSqlRelUtils.toPCollection(
readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo WHERE age > 25"));
PAssert.that(filteredAndProjected)
.containsInAnyOrder(Row.withSchema(PROJECTED_SCHEMA).addValues(42L, "USA").build());
readPipeline, env.parseQuery("SELECT * FROM SingleFileTable"));

PCollection<Long> count = singleFileResult.apply(Count.globally());
PAssert.thatSingleton(count)
.satisfies(
actualCount -> {
assertTrue("Count should be greater than 0", actualCount > 0L);
return null;
});

PipelineResult.State state = readPipeline.run().waitUntilFinish();
assertEquals(State.DONE, state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ tableElement: columnName fieldType [ NOT NULL ]
* `bigtable`
* `pubsub`
* `kafka`
* `parquet`
* `text`
* `location`: The I/O specific location of the underlying table, specified as
a [String
Expand Down Expand Up @@ -549,6 +550,104 @@ Write Mode supports writing to a topic.

For CSV only simple types are supported.

## Parquet

### Syntax

```
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE parquet
LOCATION '/path/to/files/'
```

* `LOCATION`: The path to the Parquet file(s). The interpretation of the path is based on a convention:
* **Directory:** A path ending with a forward slash (`/`) is treated as a directory. Beam reads all files within that directory. Example: `'gs://my-bucket/orders/'`.
* **Glob Pattern:** A path containing wildcard characters (`*`, `?`, `[]`) is treated as a glob pattern that the underlying file system expands. Example: `'gs://my-bucket/orders/date=2025-*-??/*.parquet'`.
* **Single File:** A full path that does not end in a slash and contains no wildcards is treated as a path to a single file. Example: `'gs://my-bucket/orders/data.parquet'`.

### Read Mode

Supports reading from Parquet files specified by the `LOCATION`. Predicate and projection push-down are supported to improve performance.

### Write Mode

Supports writing to a set of sharded Parquet files in a specified directory.

### Schema

The specified schema is used to read and write Parquet files. The schema is converted to an Avro schema internally for `ParquetIO`. Beam SQL types map to Avro types as follows:

<table>
<tr>
<td><b>Beam SQL Type</b>
</td>
<td><b>Avro Type</b>
</td>
</tr>
<tr>
<td>TINYINT, SMALLINT, INTEGER, BIGINT &nbsp;
</td>
<td>long
</td>
</tr>
<tr>
<td>FLOAT, DOUBLE
</td>
<td>double
</td>
</tr>
<tr>
<td>DECIMAL
</td>
<td>bytes (with logical type)
</td>
</tr>
<tr>
<td>BOOLEAN
</td>
<td>boolean
</td>
</tr>
<tr>
<td>DATE, TIME, TIMESTAMP
</td>
<td>long (with logical type)
</td>
</tr>
<tr>
<td>CHAR, VARCHAR
</td>
<td>string
</td>
</tr>
<tr>
<td>ARRAY
</td>
<td>array
</td>
</tr>
<tr>
<td>ROW
</td>
<td>record
</td>
</tr>
</table>

### Example

```
CREATE EXTERNAL TABLE daily_orders (
order_id BIGINT,
product_name VARCHAR,
purchase_ts TIMESTAMP
)
TYPE parquet
LOCATION '/gcs/my-data/orders/2025-07-14/*';
```

---

## MongoDB

### Syntax
Expand Down
Loading