Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
Expand All @@ -64,6 +65,7 @@
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
Expand All @@ -73,6 +75,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
Expand Down Expand Up @@ -352,7 +355,6 @@ public static Write write() {
.setBatchSizeBytes(DEFAULT_BATCH_SIZE_BYTES)
.setMaxNumMutations(DEFAULT_MAX_NUM_MUTATIONS)
.setMaxNumRows(DEFAULT_MAX_NUM_ROWS)
.setGroupingFactor(DEFAULT_GROUPING_FACTOR)
.setFailureMode(FailureMode.FAIL_FAST)
.build();
}
Expand Down Expand Up @@ -783,7 +785,7 @@ public abstract static class Write extends PTransform<PCollection<Mutation>, Spa
@Nullable
abstract PCollection getSchemaReadySignal();

abstract int getGroupingFactor();
abstract OptionalInt getGroupingFactor();

abstract Builder toBuilder();

Expand Down Expand Up @@ -967,8 +969,14 @@ private void populateDisplayDataWithParamaters(DisplayData.Builder builder) {
builder.add(
DisplayData.item("maxNumRows", getMaxNumRows())
.withLabel("Max number of rows in each batch"));
// Grouping factor default value depends on whether it is a batch or streaming pipeline.
// This function is not aware of that state, so use 'DEFAULT' if unset.
builder.add(
DisplayData.item("groupingFactor", getGroupingFactor())
DisplayData.item(
"groupingFactor",
(getGroupingFactor().isPresent()
? Integer.toString(getGroupingFactor().getAsInt())
: "DEFAULT"))
.withLabel("Number of batches to sort over"));
}
}
Expand Down Expand Up @@ -1033,7 +1041,11 @@ public SpannerWriteResult expand(PCollection<MutationGroup> input) {
// Filter out mutation groups too big to be batched.
PCollectionTuple filteredMutations =
input
.apply("To Global Window", Window.into(new GlobalWindows()))
.apply(
"RewindowIntoGlobal",
Window.<MutationGroup>into(new GlobalWindows())
.triggering(DefaultTrigger.of())
.discardingFiredPanes())
.apply(
"Filter Unbatchable Mutations",
ParDo.of(
Expand All @@ -1059,7 +1071,12 @@ public SpannerWriteResult expand(PCollection<MutationGroup> input) {
spec.getBatchSizeBytes(),
spec.getMaxNumMutations(),
spec.getMaxNumRows(),
spec.getGroupingFactor(),
// Do not group on streaming unless explicitly set.
spec.getGroupingFactor()
.orElse(
input.isBounded() == IsBounded.BOUNDED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if this condition needs to be based on the input passed to this stage or based on some parameter from the user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's kinda both!
If the source is unbounded (streaming) - and the groupingFactor has not been specified by the user, then default to no grouping.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any chance that someone using SpannerIO in a streaming pipeline is relying on the default grouping factor being 1000? I'm concerned this backwards-incompatible change could break them. Would it be sufficient to just give users the option to disable batching by setting the grouping factor to 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They already can set groupingFactorb to 1 if they want...
Breaking backward compatibility: unlikely.

The default of 1000 causes OOMs when using streaming, with wide windows, and high throughput... When this happens, it is not always obvious that grouping is the issue...

With smaller windows/less throughput, it is much less likely that a group will be filled, (groups are bounded by bundles, which are bounded by windows)., So it is unlikely that anyone ever got to fill the group with 1000 batches.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They already can set groupingFactorb to 1 if they want...

Ha yeah sorry that was unclear. At the time I thought that groupingFactor = 1 enabled the optimization in #11529, so I was wondering if this was really necessary since users could just enable them by setting grouping factor manually. But I see now that grouping is separate from batching. And its disabling batching that enables your other PR.

? DEFAULT_GROUPING_FACTOR
: 1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if this were another constant. We could have DEFAULT_GROUPING_FACTOR_BOUNDED and DEFAULT_GROUPING_FACTOR_UNBOUNDED. It doesn't need to be done here, could be in a follow-up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do in a separate pr

schemaView))
.withSideInputs(schemaView))
.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,54 @@ public void streamingWrites() throws Exception {
verifyBatches(batch(m(1L), m(2L)), batch(m(3L), m(4L)), batch(m(5L), m(6L)));
}

@Test
public void streamingWritesWithGrouping() throws Exception {

// verify that grouping/sorting occurs when set.
TestStream<Mutation> testStream =
TestStream.create(SerializableCoder.of(Mutation.class))
.addElements(m(1L), m(5L), m(2L), m(4L), m(3L), m(6L))
.advanceWatermarkToInfinity();
pipeline
.apply(testStream)
.apply(
SpannerIO.write()
.withProjectId("test-project")
.withInstanceId("test-instance")
.withDatabaseId("test-database")
.withServiceFactory(serviceFactory)
.withGroupingFactor(40)
.withMaxNumRows(2));
pipeline.run();

// Output should be batches of sorted mutations.
verifyBatches(batch(m(1L), m(2L)), batch(m(3L), m(4L)), batch(m(5L), m(6L)));
}

@Test
public void streamingWritesNoGrouping() throws Exception {

// verify that grouping/sorting does not occur - batches should be created in received order.
TestStream<Mutation> testStream =
TestStream.create(SerializableCoder.of(Mutation.class))
.addElements(m(1L), m(5L), m(2L), m(4L), m(3L), m(6L))
.advanceWatermarkToInfinity();

// verify that grouping/sorting does not occur when notset.
pipeline
.apply(testStream)
.apply(
SpannerIO.write()
.withProjectId("test-project")
.withInstanceId("test-instance")
.withDatabaseId("test-database")
.withServiceFactory(serviceFactory)
.withMaxNumRows(2));
pipeline.run();

verifyBatches(batch(m(1L), m(5L)), batch(m(2L), m(4L)), batch(m(3L), m(6L)));
}

@Test
public void reportFailures() throws Exception {

Expand Down Expand Up @@ -608,7 +656,18 @@ public void displayDataWrite() throws Exception {
assertThat(data, hasDisplayItem("batchSizeBytes", 123));
assertThat(data, hasDisplayItem("maxNumMutations", 456));
assertThat(data, hasDisplayItem("maxNumRows", 789));
assertThat(data, hasDisplayItem("groupingFactor", 100));
assertThat(data, hasDisplayItem("groupingFactor", "100"));

// check for default grouping value
write =
SpannerIO.write()
.withProjectId("test-project")
.withInstanceId("test-instance")
.withDatabaseId("test-database");

data = DisplayData.from(write);
assertThat(data.items(), hasSize(7));
assertThat(data, hasDisplayItem("groupingFactor", "DEFAULT"));
}

@Test
Expand All @@ -632,7 +691,19 @@ public void displayDataWriteGrouped() throws Exception {
assertThat(data, hasDisplayItem("batchSizeBytes", 123));
assertThat(data, hasDisplayItem("maxNumMutations", 456));
assertThat(data, hasDisplayItem("maxNumRows", 789));
assertThat(data, hasDisplayItem("groupingFactor", 100));
assertThat(data, hasDisplayItem("groupingFactor", "100"));

// check for default grouping value
writeGrouped =
SpannerIO.write()
.withProjectId("test-project")
.withInstanceId("test-instance")
.withDatabaseId("test-database")
.grouped();

data = DisplayData.from(writeGrouped);
assertThat(data.items(), hasSize(7));
assertThat(data, hasDisplayItem("groupingFactor", "DEFAULT"));
}

@Test
Expand Down