Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,8 @@ public static FileNaming relativeFileNaming(

abstract boolean getNoSpilling();

abstract @Nullable Integer getMaxNumWritersPerBundle();

abstract @Nullable ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract Builder<DestinationT, UserT> toBuilder();
Expand Down Expand Up @@ -1093,6 +1095,9 @@ abstract Builder<DestinationT, UserT> setSharding(

abstract Builder<DestinationT, UserT> setNoSpilling(boolean noSpilling);

abstract Builder<DestinationT, UserT> setMaxNumWritersPerBundle(
@Nullable Integer maxNumWritersPerBundle);

abstract Builder<DestinationT, UserT> setBadRecordErrorHandler(
@Nullable ErrorHandler<BadRecord, ?> badRecordErrorHandler);

Expand Down Expand Up @@ -1326,6 +1331,15 @@ public Write<DestinationT, UserT> withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
}

/**
* Set the maximum number of writers created in a bundle before spilling to shuffle. See {@link
* WriteFiles#withMaxNumWritersPerBundle()}.
*/
public Write<DestinationT, UserT> withMaxNumWritersPerBundle(
@Nullable Integer maxNumWritersPerBundle) {
return toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
}

/**
* Configures a new {@link Write} with an ErrorHandler. For configuring an ErrorHandler, see
* {@link ErrorHandler}. Whenever a record is formatted, or a lookup for a dynamic destination
Expand Down Expand Up @@ -1424,6 +1438,9 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
resolvedSpec.setIgnoreWindowing(getIgnoreWindowing());
resolvedSpec.setAutoSharding(getAutoSharding());
resolvedSpec.setNoSpilling(getNoSpilling());
if (getMaxNumWritersPerBundle() != null) {
resolvedSpec.setMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
}

Write<DestinationT, UserT> resolved = resolvedSpec.build();
WriteFiles<UserT, DestinationT, ?> writeFiles =
Expand All @@ -1445,6 +1462,9 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
if (getNoSpilling()) {
writeFiles = writeFiles.withNoSpilling();
}
if (getMaxNumWritersPerBundle() != null) {
writeFiles = writeFiles.withMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
}
if (getBadRecordErrorHandler() != null) {
writeFiles = writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ public abstract static class Write extends PTransform<PCollection<byte[]>, PDone
/** Whether to skip the spilling of data caused by having maxNumWritersPerBundle. */
abstract boolean getNoSpilling();

/** Maximum number of writers created in a bundle before spilling to shuffle. */
abstract @Nullable Integer getMaxNumWritersPerBundle();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -290,6 +293,8 @@ abstract static class Builder {

abstract Builder setNoSpilling(boolean noSpilling);

abstract Builder setMaxNumWritersPerBundle(@Nullable Integer maxNumWritersPerBundle);

abstract Write build();
}

Expand Down Expand Up @@ -388,6 +393,11 @@ public Write withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
}

/** See {@link WriteFiles#withMaxNumWritersPerBundle()}. */
public Write withMaxNumWritersPerBundle(@Nullable Integer maxNumWritersPerBundle) {
return toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
}

@Override
public PDone expand(PCollection<byte[]> input) {
checkState(
Expand All @@ -403,6 +413,9 @@ public PDone expand(PCollection<byte[]> input) {
if (getNoSpilling()) {
write = write.withNoSpilling();
}
if (getMaxNumWritersPerBundle() != null) {
write = write.withMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
}
input.apply("Write", write);
return PDone.in(input.getPipeline());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public static TFRecordWriteSchemaTransformConfiguration.Builder builder() {
@Nullable
public abstract Boolean getNoSpilling();

@SchemaFieldDescription(
"Maximum number of writers created in a bundle before spilling to shuffle.")
@Nullable
public abstract Integer getMaxNumWritersPerBundle();

@SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
@Nullable
public abstract ErrorHandling getErrorHandling();
Expand All @@ -99,6 +104,8 @@ public abstract static class Builder {

public abstract Builder setNoSpilling(Boolean value);

public abstract Builder setMaxNumWritersPerBundle(@Nullable Integer maxNumWritersPerBundle);

public abstract Builder setErrorHandling(ErrorHandling errorHandling);

/** Builds the {@link TFRecordWriteSchemaTransformConfiguration} configuration. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
if (Boolean.TRUE.equals(configuration.getNoSpilling())) {
writeTransform = writeTransform.withNoSpilling();
}
if (configuration.getMaxNumWritersPerBundle() != null) {
writeTransform =
writeTransform.withMaxNumWritersPerBundle(configuration.getMaxNumWritersPerBundle());
}

// Obtain input schema and verify only one field and its bytes
Schema inputSchema = input.get(INPUT).getSchema();
Expand Down
21 changes: 21 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,9 @@ public abstract static class TypedWrite<UserT, DestinationT>
/** Whether to skip the spilling of data caused by having maxNumWritersPerBundle. */
abstract boolean getNoSpilling();

/** Maximum number of writers created in a bundle before spilling to shuffle. */
abstract @Nullable Integer getMaxNumWritersPerBundle();

/** Whether to skip writing any output files if the PCollection is empty. */
abstract boolean getSkipIfEmpty();

Expand Down Expand Up @@ -779,6 +782,9 @@ abstract Builder<UserT, DestinationT> setBatchMaxBufferingDuration(

abstract Builder<UserT, DestinationT> setNoSpilling(boolean noSpilling);

abstract Builder<UserT, DestinationT> setMaxNumWritersPerBundle(
@Nullable Integer maxNumWritersPerBundle);

abstract Builder<UserT, DestinationT> setSkipIfEmpty(boolean noSpilling);

abstract Builder<UserT, DestinationT> setWritableByteChannelFactory(
Expand Down Expand Up @@ -1062,6 +1068,12 @@ public TypedWrite<UserT, DestinationT> withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
}

/** Set the maximum number of writers created in a bundle before spilling to shuffle. */
public TypedWrite<UserT, DestinationT> withMaxNumWritersPerBundle(
@Nullable Integer maxNumWritersPerBundle) {
return toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
}

/** See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage. */
public TypedWrite<UserT, DestinationT> withBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> errorHandler) {
Expand Down Expand Up @@ -1161,6 +1173,9 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
if (getNoSpilling()) {
write = write.withNoSpilling();
}
if (getMaxNumWritersPerBundle() != null) {
write = write.withMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
}
if (getBadRecordErrorHandler() != null) {
write = write.withBadRecordErrorHandler(getBadRecordErrorHandler());
}
Expand All @@ -1187,6 +1202,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
builder
.addIfNotNull(
DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"))
.addIfNotNull(DisplayData.item("maxNumWritersPerBundle", getMaxNumWritersPerBundle()))
.addIfNotNull(
DisplayData.item("tempDirectory", getTempDirectory())
.withLabel("Directory for temporary files"))
Expand Down Expand Up @@ -1348,6 +1364,11 @@ public Write withNoSpilling() {
return new Write(inner.withNoSpilling());
}

/** See {@link TypedWrite#withMaxNumWritersPerBundle(Integer)}. */
public Write withMaxNumWritersPerBundle(@Nullable Integer maxNumWritersPerBundle) {
return new Write(inner.withMaxNumWritersPerBundle(maxNumWritersPerBundle));
}

/** See {@link TypedWrite#withBatchSize(Integer)}. */
public Write withBatchSize(@Nullable Integer batchSize) {
return new Write(inner.withBatchSize(batchSize));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,13 @@ public WriteFiles<UserT, DestinationT, OutputT> withNumShards(
/** Set the maximum number of writers created in a bundle before spilling to shuffle. */
public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle(
int maxNumWritersPerBundle) {
checkArgument(
getMaxNumWritersPerBundle() != -1,
"Cannot use withMaxNumWritersPerBundle() after withNoSpilling() has been set.");
checkArgument(
maxNumWritersPerBundle > 0 && maxNumWritersPerBundle <= DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE,
"maxNumWritersPerBundle must be greater than 0 and less than or equal to %s",
DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE);
return toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ public void testWriteFindTransformAndMakeItWork() {
"num_shards",
"compression",
"no_spilling",
"max_num_writers_per_bundle",
"error_handling"),
tfrecordProvider.configurationSchema().getFields().stream()
.map(field -> field.getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ public void testWriteDisplayData() {
.withSuffix("bar")
.withShardNameTemplate("-SS-of-NN-")
.withNumShards(100)
.withMaxNumWritersPerBundle(5)
.withFooter("myFooter")
.withHeader("myHeader");

Expand All @@ -661,6 +662,7 @@ public void testWriteDisplayData() {
assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
assertThat(displayData, hasDisplayItem("numShards", 100));
assertThat(displayData, hasDisplayItem("maxNumWritersPerBundle", 5));
assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "UNCOMPRESSED"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,8 @@ public abstract static class TypedWrite<UserT, DestinationT, OutputT>

abstract boolean getNoSpilling();

abstract @Nullable Integer getMaxNumWritersPerBundle();

abstract @Nullable FilenamePolicy getFilenamePolicy();

abstract @Nullable DynamicAvroDestinations<UserT, DestinationT, OutputT>
Expand Down Expand Up @@ -1483,6 +1485,9 @@ public Builder<UserT, DestinationT, OutputT> setGenericRecords(boolean genericRe

abstract Builder<UserT, DestinationT, OutputT> setNoSpilling(boolean noSpilling);

abstract Builder<UserT, DestinationT, OutputT> setMaxNumWritersPerBundle(
@Nullable Integer maxNumWritersPerBundle);

abstract Builder<UserT, DestinationT, OutputT> setFilenamePolicy(
FilenamePolicy filenamePolicy);

Expand Down Expand Up @@ -1690,6 +1695,12 @@ public TypedWrite<UserT, DestinationT, OutputT> withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
}

/** See {@link WriteFiles#withMaxNumWritersPerBundle()}. */
public TypedWrite<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle(
@Nullable Integer maxNumWritersPerBundle) {
return toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
}

/** Writes to Avro file(s) compressed using specified codec. */
public TypedWrite<UserT, DestinationT, OutputT> withCodec(CodecFactory codec) {
return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build();
Expand Down Expand Up @@ -1799,6 +1810,9 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
if (getNoSpilling()) {
write = write.withNoSpilling();
}
if (getMaxNumWritersPerBundle() != null) {
write = write.withMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
}
if (getBadRecordErrorHandler() != null) {
write = write.withBadRecordErrorHandler(getBadRecordErrorHandler());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.commons.csv.CSVFormat;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* {@link PTransform}s for reading and writing CSV files.
Expand Down Expand Up @@ -550,6 +551,16 @@ public Write<T> withNoSpilling() {
return toBuilder().setTextIOWrite(getTextIOWrite().withNoSpilling()).build();
}

/**
* Set the maximum number of writers created in a bundle before spilling to shuffle. See {@link
* WriteFiles#withMaxNumWritersPerBundle()}.
*/
public Write<T> withMaxNumWritersPerBundle(@Nullable Integer maxNumWritersPerBundle) {
return toBuilder()
.setTextIOWrite(getTextIOWrite().withMaxNumWritersPerBundle(maxNumWritersPerBundle))
.build();
}

/**
* Specifies to use a given fixed number of shards per window. See {@link
* TextIO.Write#withNumShards}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* {@link PTransform}s for reading and writing JSON files.
Expand Down Expand Up @@ -170,6 +171,16 @@ public Write<T> withNoSpilling() {
return toBuilder().setTextIOWrite(getTextIOWrite().withNoSpilling()).build();
}

/**
* Set the maximum number of writers created in a bundle before spilling to shuffle. See {@link
* WriteFiles#withMaxNumWritersPerBundle()}.
*/
public Write<T> withMaxNumWritersPerBundle(@Nullable Integer maxNumWritersPerBundle) {
return toBuilder()
.setTextIOWrite(getTextIOWrite().withMaxNumWritersPerBundle(maxNumWritersPerBundle))
.build();
}

/**
* Specifies to use a given fixed number of shards per window. See {@link
* TextIO.Write#withNumShards}.
Expand Down
7 changes: 6 additions & 1 deletion sdks/standard_external_transforms.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# configuration in /sdks/standard_expansion_services.yaml.
# Refer to gen_xlang_wrappers.py for more info.
#
# Last updated on: 2025-04-24
# Last updated on: 2025-06-05

- default_service: sdks:java:io:expansion-service:shadowJar
description: 'Outputs a PCollection of Beam Rows, each containing a single INT64
Expand Down Expand Up @@ -91,6 +91,11 @@
name: filename_suffix
nullable: true
type: str
- description: Maximum number of writers created in a bundle before spilling to
shuffle.
name: max_num_writers_per_bundle
nullable: true
type: int32
- description: Whether to skip the spilling of data caused by having maxNumWritersPerBundle.
name: no_spilling
nullable: true
Expand Down
Loading