diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index aff1e4cede3b..d9debbdcb3d7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; @@ -526,7 +527,7 @@ public void evaluate( } /** - * Returns the table to write, or {@code null} if reading from a query instead. + * Returns the table to read, or {@code null} if reading from a query instead. */ public TableReference getTable() { return table; @@ -931,37 +932,34 @@ private static void verifyTableEmpty( public void validate(PCollection input) { BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); - TableReference table = getTable(); - if (table == null && tableRefFunction == null) { - throw new IllegalStateException( - "must set the table reference of a BigQueryIO.Write transform"); - } - if (table != null && tableRefFunction != null) { - throw new IllegalStateException( - "Cannot set both a table reference and a table function for a BigQueryIO.Write " - + "transform"); - } + // Exactly one of the table and table reference can be configured. + checkState( + jsonTableRef != null || tableRefFunction != null, + "must set the table reference of a BigQueryIO.Write transform"); + checkState( + jsonTableRef == null || tableRefFunction == null, + "Cannot set both a table reference and a table function for a BigQueryIO.Write" + + " transform"); - if (createDisposition == CreateDisposition.CREATE_IF_NEEDED && jsonSchema == null) { - throw new IllegalArgumentException("CreateDisposition is CREATE_IF_NEEDED, " - + "however no schema was provided."); - } + // Require a schema if creating one or more tables. + checkArgument( + createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null, + "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided."); + + // The user specified a table. + if (jsonTableRef != null && validate) { + TableReference table = getTable(); - if (table != null && table.getProjectId() == null) { // If user does not specify a project we assume the table to be located in the project - // that owns the Dataflow job. - String projectIdFromOptions = options.getProject(); - LOG.warn(String.format(BigQueryIO.SET_PROJECT_FROM_OPTIONS_WARNING, table.getDatasetId(), - table.getTableId(), projectIdFromOptions)); - table.setProjectId(projectIdFromOptions); - } + // configured in BigQueryOptions. + if (Strings.isNullOrEmpty(table.getProjectId())) { + table.setProjectId(options.getProject()); + } - // Check for destination table presence and emptiness for early failure notification. - // Note that a presence check can fail if the table or dataset are created by earlier stages - // of the pipeline. For these cases the withoutValidation method can be used to disable - // the check. - // Unfortunately we can't validate anything early if tableRefFunction is specified. - if (table != null && validate) { + // Check for destination table presence and emptiness for early failure notification. + // Note that a presence check can fail when the table or dataset is created by an earlier + // stage of the pipeline. For these cases the #withoutValidation method can be used to + // disable the check. verifyDatasetPresence(options, table); if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) { verifyTablePresence(options, table); @@ -972,16 +970,16 @@ public void validate(PCollection input) { } if (options.isStreaming() || tableRefFunction != null) { - // We will use BigQuery's streaming write API -- validate support dispositions. - if (createDisposition == CreateDisposition.CREATE_NEVER) { - throw new IllegalArgumentException("CreateDispostion.CREATE_NEVER is not " - + "supported for unbounded PCollections or when using tablespec functions."); - } + // We will use BigQuery's streaming write API -- validate supported dispositions. + checkArgument( + createDisposition != CreateDisposition.CREATE_NEVER, + "CreateDisposition.CREATE_NEVER is not supported for an unbounded PCollection or when" + + " using a tablespec function."); - if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) { - throw new IllegalArgumentException("WriteDisposition.WRITE_TRUNCATE is not " - + "supported for unbounded PCollections or when using tablespec functions."); - } + checkArgument( + writeDisposition != WriteDisposition.WRITE_TRUNCATE, + "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or" + + " when using a tablespec function."); } else { // We will use a BigQuery load job -- validate the temp location. String tempLocation = options.getTempLocation(); @@ -1012,13 +1010,17 @@ public PDone apply(PCollection input) { return input.apply(new StreamWithDeDup(getTable(), tableRefFunction, getSchema())); } + TableReference table = fromJsonString(jsonTableRef, TableReference.class); + if (Strings.isNullOrEmpty(table.getProjectId())) { + table.setProjectId(options.getProject()); + } String jobIdToken = UUID.randomUUID().toString(); String tempFilePrefix = options.getTempLocation() + "/BigQuerySinkTemp/" + jobIdToken; BigQueryServices bqServices = getBigQueryServices(); return input.apply("Write", org.apache.beam.sdk.io.Write.to( new BigQuerySink( jobIdToken, - jsonTableRef, + table, jsonSchema, getWriteDisposition(), getCreateDisposition(), @@ -1047,7 +1049,8 @@ public TableSchema getSchema() { return fromJsonString(jsonSchema, TableSchema.class); } - /** Returns the table reference, or {@code null} if a . */ + /** Returns the table reference, or {@code null}. */ + @Nullable public TableReference getTable() { return fromJsonString(jsonTableRef, TableReference.class); } @@ -1086,7 +1089,7 @@ static class BigQuerySink extends FileBasedSink { public BigQuerySink( String jobIdToken, - @Nullable String jsonTable, + @Nullable TableReference table, @Nullable String jsonSchema, WriteDisposition writeDisposition, CreateDisposition createDisposition, @@ -1095,7 +1098,13 @@ public BigQuerySink( BigQueryServices bqServices) { super(tempFile, ".json"); this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken"); - this.jsonTable = jsonTable; + if (table == null) { + this.jsonTable = null; + } else { + checkArgument(!Strings.isNullOrEmpty(table.getProjectId()), + "Table %s should have a project specified", table); + this.jsonTable = toJsonString(table); + } this.jsonSchema = jsonSchema; this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); this.createDisposition = checkNotNull(createDisposition, "createDisposition"); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index b9af1e2f3c1b..e1f8e4d9099a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -31,6 +31,8 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.BigQueryServices; import org.apache.beam.sdk.util.BigQueryServices.Status; import org.apache.beam.sdk.util.CoderUtils; @@ -339,7 +341,7 @@ public void testCustomSink() throws Exception { new TableRow().set("name", "b").set("number", 2), new TableRow().set("name", "c").set("number", 3))) .setCoder(TableRowJsonCoder.of()) - .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id") + .apply(BigQueryIO.Write.to("dataset-id.table-id") .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withSchema(new TableSchema().setFields( ImmutableList.of( @@ -604,4 +606,40 @@ public void testBigQueryIOGetName() { assertEquals("ReadMyTable", BigQueryIO.Read.named("ReadMyTable").getName()); assertEquals("WriteMyTable", BigQueryIO.Write.named("WriteMyTable").getName()); } + + @Test + public void testWriteValidateFailsCreateNoSchema() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("no schema was provided"); + TestPipeline.create() + .apply(Create.of()) + .apply(BigQueryIO.Write + .to("dataset.table") + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)); + } + + @Test + public void testWriteValidateFailsTableAndTableSpec() { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Cannot set both a table reference and a table function"); + TestPipeline.create() + .apply(Create.of()) + .apply(BigQueryIO.Write + .to("dataset.table") + .to(new SerializableFunction() { + @Override + public String apply(BoundedWindow input) { + return null; + } + })); + } + + @Test + public void testWriteValidateFailsNoTableAndNoTableSpec() { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform"); + TestPipeline.create() + .apply(Create.of()) + .apply(BigQueryIO.Write.named("name")); + } }