Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 50 additions & 41 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -526,7 +527,7 @@ public void evaluate(
}

/**
* Returns the table to write, or {@code null} if reading from a query instead.
* Returns the table to read, or {@code null} if reading from a query instead.
*/
public TableReference getTable() {
return table;
Expand Down Expand Up @@ -931,37 +932,34 @@ private static void verifyTableEmpty(
public void validate(PCollection<TableRow> input) {
BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);

TableReference table = getTable();
if (table == null && tableRefFunction == null) {
throw new IllegalStateException(
"must set the table reference of a BigQueryIO.Write transform");
}
if (table != null && tableRefFunction != null) {
throw new IllegalStateException(
"Cannot set both a table reference and a table function for a BigQueryIO.Write "
+ "transform");
}
// Exactly one of the table and table reference can be configured.
checkState(
jsonTableRef != null || tableRefFunction != null,
"must set the table reference of a BigQueryIO.Write transform");
checkState(
jsonTableRef == null || tableRefFunction == null,
"Cannot set both a table reference and a table function for a BigQueryIO.Write"
+ " transform");

if (createDisposition == CreateDisposition.CREATE_IF_NEEDED && jsonSchema == null) {
throw new IllegalArgumentException("CreateDisposition is CREATE_IF_NEEDED, "
+ "however no schema was provided.");
}
// Require a schema if creating one or more tables.
checkArgument(
createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
"CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");

// The user specified a table.
if (jsonTableRef != null && validate) {
TableReference table = getTable();

if (table != null && table.getProjectId() == null) {
// If user does not specify a project we assume the table to be located in the project
// that owns the Dataflow job.
String projectIdFromOptions = options.getProject();
LOG.warn(String.format(BigQueryIO.SET_PROJECT_FROM_OPTIONS_WARNING, table.getDatasetId(),
table.getTableId(), projectIdFromOptions));
table.setProjectId(projectIdFromOptions);
}
// configured in BigQueryOptions.
if (Strings.isNullOrEmpty(table.getProjectId())) {
table.setProjectId(options.getProject());
Copy link
Contributor

@peihe peihe Apr 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove projectIdFromOptions?
The name "projectIdFromOptions" was bad, maybe just projectId

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On discussion, removed the LOG. We don't know whether it will be run, and we don't log elsewhere.

}

// Check for destination table presence and emptiness for early failure notification.
// Note that a presence check can fail if the table or dataset are created by earlier stages
// of the pipeline. For these cases the withoutValidation method can be used to disable
// the check.
// Unfortunately we can't validate anything early if tableRefFunction is specified.
if (table != null && validate) {
// Check for destination table presence and emptiness for early failure notification.
// Note that a presence check can fail when the table or dataset is created by an earlier
// stage of the pipeline. For these cases the #withoutValidation method can be used to
// disable the check.
verifyDatasetPresence(options, table);
if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
verifyTablePresence(options, table);
Expand All @@ -972,16 +970,16 @@ public void validate(PCollection<TableRow> input) {
}

if (options.isStreaming() || tableRefFunction != null) {
// We will use BigQuery's streaming write API -- validate support dispositions.
if (createDisposition == CreateDisposition.CREATE_NEVER) {
throw new IllegalArgumentException("CreateDispostion.CREATE_NEVER is not "
+ "supported for unbounded PCollections or when using tablespec functions.");
}
// We will use BigQuery's streaming write API -- validate supported dispositions.
checkArgument(
createDisposition != CreateDisposition.CREATE_NEVER,
"CreateDisposition.CREATE_NEVER is not supported for an unbounded PCollection or when"
+ " using a tablespec function.");

if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
throw new IllegalArgumentException("WriteDisposition.WRITE_TRUNCATE is not "
+ "supported for unbounded PCollections or when using tablespec functions.");
}
checkArgument(
writeDisposition != WriteDisposition.WRITE_TRUNCATE,
"WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
+ " when using a tablespec function.");
} else {
// We will use a BigQuery load job -- validate the temp location.
String tempLocation = options.getTempLocation();
Expand Down Expand Up @@ -1012,13 +1010,17 @@ public PDone apply(PCollection<TableRow> input) {
return input.apply(new StreamWithDeDup(getTable(), tableRefFunction, getSchema()));
}

TableReference table = fromJsonString(jsonTableRef, TableReference.class);
if (Strings.isNullOrEmpty(table.getProjectId())) {
table.setProjectId(options.getProject());
}
String jobIdToken = UUID.randomUUID().toString();
String tempFilePrefix = options.getTempLocation() + "/BigQuerySinkTemp/" + jobIdToken;
BigQueryServices bqServices = getBigQueryServices();
return input.apply("Write", org.apache.beam.sdk.io.Write.to(
new BigQuerySink(
jobIdToken,
jsonTableRef,
table,
jsonSchema,
getWriteDisposition(),
getCreateDisposition(),
Expand Down Expand Up @@ -1047,7 +1049,8 @@ public TableSchema getSchema() {
return fromJsonString(jsonSchema, TableSchema.class);
}

/** Returns the table reference, or {@code null} if a . */
/** Returns the table reference, or {@code null}. */
@Nullable
public TableReference getTable() {
return fromJsonString(jsonTableRef, TableReference.class);
}
Expand Down Expand Up @@ -1086,7 +1089,7 @@ static class BigQuerySink extends FileBasedSink<TableRow> {

public BigQuerySink(
String jobIdToken,
@Nullable String jsonTable,
@Nullable TableReference table,
@Nullable String jsonSchema,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
Expand All @@ -1095,7 +1098,13 @@ public BigQuerySink(
BigQueryServices bqServices) {
super(tempFile, ".json");
this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
this.jsonTable = jsonTable;
if (table == null) {
this.jsonTable = null;
} else {
checkArgument(!Strings.isNullOrEmpty(table.getProjectId()),
"Table %s should have a project specified", table);
this.jsonTable = toJsonString(table);
}
this.jsonSchema = jsonSchema;
this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
this.createDisposition = checkNotNull(createDisposition, "createDisposition");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.BigQueryServices;
import org.apache.beam.sdk.util.BigQueryServices.Status;
import org.apache.beam.sdk.util.CoderUtils;
Expand Down Expand Up @@ -339,7 +341,7 @@ public void testCustomSink() throws Exception {
new TableRow().set("name", "b").set("number", 2),
new TableRow().set("name", "c").set("number", 3)))
.setCoder(TableRowJsonCoder.of())
.apply(BigQueryIO.Write.to("project-id:dataset-id.table-id")
.apply(BigQueryIO.Write.to("dataset-id.table-id")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withSchema(new TableSchema().setFields(
ImmutableList.of(
Expand Down Expand Up @@ -604,4 +606,40 @@ public void testBigQueryIOGetName() {
assertEquals("ReadMyTable", BigQueryIO.Read.named("ReadMyTable").getName());
assertEquals("WriteMyTable", BigQueryIO.Write.named("WriteMyTable").getName());
}

@Test
public void testWriteValidateFailsCreateNoSchema() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("no schema was provided");
TestPipeline.create()
.apply(Create.<TableRow>of())
.apply(BigQueryIO.Write
.to("dataset.table")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
}

@Test
public void testWriteValidateFailsTableAndTableSpec() {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Cannot set both a table reference and a table function");
TestPipeline.create()
.apply(Create.<TableRow>of())
.apply(BigQueryIO.Write
.to("dataset.table")
.to(new SerializableFunction<BoundedWindow, String>() {
@Override
public String apply(BoundedWindow input) {
return null;
}
}));
}

@Test
public void testWriteValidateFailsNoTableAndNoTableSpec() {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform");
TestPipeline.create()
.apply(Create.<TableRow>of())
.apply(BigQueryIO.Write.named("name"));
}
}