Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
597523f
chore: migrate BigtableIO to use Veneer
mutianf Sep 25, 2022
5b27c90
address comments
mutianf Dec 12, 2022
bb579f6
refactor setting configurations for veneer
mutianf Feb 3, 2023
f284ca0
fix client wrapper
mutianf Feb 3, 2023
eb75c07
Merge branch 'master' into migration
mutianf Feb 10, 2023
0bc8e24
Merge branch 'master' into migration
mutianf Feb 10, 2023
9721a1a
use shared clients
mutianf Feb 10, 2023
f341ab9
revert clientwrapper changes
mutianf Feb 10, 2023
33ae4ab
refactor
mutianf Feb 15, 2023
d91ef0c
Merge branch 'master' into migration
mutianf Feb 15, 2023
a1dc6b5
clean up comments
mutianf Feb 16, 2023
5b50bd1
address comments part 1
mutianf Feb 17, 2023
658a4fd
address comment p2, rewrite RowAdaptor
mutianf Feb 17, 2023
7fc9f00
address comments p3, move to GcpCredentialFactory and clean up servic…
mutianf Feb 17, 2023
42b3e17
a new round of comments
mutianf Feb 17, 2023
f88b300
more small fixes
mutianf Feb 17, 2023
5b818ff
address more comments, make doc better
mutianf Feb 17, 2023
2997ed3
Merge branch 'master' into migration
mutianf Feb 17, 2023
7da3039
fix IT test
mutianf Feb 18, 2023
13cea7d
Merge branch 'master' into migration
mutianf Feb 19, 2023
f2332bc
update WriteOptions api
mutianf Feb 21, 2023
5bad27c
Fix Nonnull annotations and add Nullable annotations
mutianf Feb 21, 2023
97e96e8
add 2 tests on credentials, update CHANGES.md
mutianf Feb 22, 2023
c873135
Merge branch 'master' into migration
mutianf Feb 22, 2023
9eca668
use as(GcpOptions.class) instead of casting, fixing tests
mutianf Feb 22, 2023
2aeb151
nit move set emulator = true up
mutianf Feb 22, 2023
0bc1995
revert credential changes, piping channel count
mutianf Feb 27, 2023
b08fcf1
remove bigtable change from CHANGES.md
mutianf Feb 27, 2023
ce231c4
fix nits
mutianf Mar 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_cloud_bigquery : "com.google.cloud:google-cloud-bigquery", // google_cloud_platform_libraries_bom sets version
google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage", // google_cloud_platform_libraries_bom sets version
google_cloud_bigtable : "com.google.cloud:google-cloud-bigtable", // google_cloud_platform_libraries_bom sets version
google_cloud_bigtable_client_core : "com.google.cloud.bigtable:bigtable-client-core:1.26.3",
google_cloud_bigtable_client_core_config : "com.google.cloud.bigtable:bigtable-client-core-config:1.28.0",
google_cloud_bigtable_emulator : "com.google.cloud:google-cloud-bigtable-emulator:0.137.1",
google_cloud_core : "com.google.cloud:google-cloud-core", // google_cloud_platform_libraries_bom sets version
google_cloud_core_grpc : "com.google.cloud:google-cloud-core-grpc", // google_cloud_platform_libraries_bom sets version
Expand Down
2 changes: 1 addition & 1 deletion sdks/java/extensions/sql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ dependencies {
testImplementation library.java.quickcheck_core
testImplementation library.java.testcontainers_kafka
testImplementation library.java.google_cloud_bigtable
testImplementation library.java.google_cloud_bigtable_client_core
testImplementation library.java.google_cloud_bigtable_client_core_config
testImplementation library.java.google_cloud_bigtable_emulator
testImplementation library.java.proto_google_cloud_bigtable_admin_v2
testImplementation library.java.proto_google_cloud_datastore_v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,45 @@
import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.byteString;
import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.byteStringUtf8;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.auth.Credentials;
import com.google.bigtable.admin.v2.ColumnFamily;
import com.google.bigtable.admin.v2.DeleteTableRequest;
import com.google.bigtable.admin.v2.Table;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.Mutation;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.config.CredentialOptions;
import com.google.cloud.bigtable.grpc.BigtableDataClient;
import com.google.cloud.bigtable.grpc.BigtableSession;
import com.google.cloud.bigtable.grpc.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import java.io.IOException;
import java.io.Serializable;
import org.checkerframework.checker.nullness.qual.Nullable;

class BigtableClientWrapper implements Serializable {
private final BigtableTableAdminClient tableAdminClient;
private final BigtableDataClient dataClient;
private final BigtableSession session;
private final BigtableOptions bigtableOptions;

BigtableClientWrapper(
String project,
String instanceId,
@Nullable Integer emulatorPort,
@Nullable Credentials gcpCredentials)
throws IOException {
BigtableOptions.Builder optionsBuilder =
BigtableOptions.builder()
BigtableDataSettings.Builder settings =
BigtableDataSettings.newBuilderForEmulator(emulatorPort)
.setProjectId(project)
.setInstanceId(instanceId)
.setUserAgent("apache-beam-test");
if (emulatorPort != null) {
optionsBuilder.enableEmulator("localhost", emulatorPort);
}
if (gcpCredentials != null) {
optionsBuilder.setCredentialOptions(CredentialOptions.credential(gcpCredentials));
}
bigtableOptions = optionsBuilder.build();
.setCredentialsProvider(FixedCredentialsProvider.create(gcpCredentials));

session = new BigtableSession(bigtableOptions);
tableAdminClient = session.getTableAdminClient();
dataClient = session.getDataClient();
settings
.stubSettings()
.setHeaderProvider(FixedHeaderProvider.create("user-agent", "apache-beam-test"));
dataClient = BigtableDataClient.create(settings.build());
BigtableTableAdminSettings tableSettings =
BigtableTableAdminSettings.newBuilderForEmulator(emulatorPort)
.setProjectId(project)
.setInstanceId(instanceId)
.build();
tableAdminClient = BigtableTableAdminClient.create(tableSettings);
}

void writeRow(
Expand All @@ -72,44 +68,24 @@ void writeRow(
String columnQualifier,
byte[] value,
long timestampMicros) {
Mutation.SetCell setCell =
Mutation.SetCell.newBuilder()
.setFamilyName(familyColumn)
.setColumnQualifier(byteStringUtf8(columnQualifier))
.setValue(byteString(value))
.setTimestampMicros(timestampMicros)
.build();
Mutation mutation = Mutation.newBuilder().setSetCell(setCell).build();
MutateRowRequest mutateRowRequest =
MutateRowRequest.newBuilder()
.setRowKey(byteStringUtf8(key))
.setTableName(bigtableOptions.getInstanceName().toTableNameStr(table))
.addMutations(mutation)
.build();
dataClient.mutateRow(mutateRowRequest);
RowMutation rowMutation =
RowMutation.create(table, key)
.setCell(
familyColumn, byteStringUtf8(columnQualifier), timestampMicros, byteString(value));
dataClient.mutateRow(rowMutation);
}

void createTable(String tableName, String familyName) {
Table.Builder tableBuilder = Table.newBuilder();
tableBuilder.putColumnFamilies(familyName, ColumnFamily.newBuilder().build());

String instanceName = bigtableOptions.getInstanceName().toString();
com.google.bigtable.admin.v2.CreateTableRequest.Builder createTableRequestBuilder =
com.google.bigtable.admin.v2.CreateTableRequest.newBuilder()
.setParent(instanceName)
.setTableId(tableName)
.setTable(tableBuilder.build());
tableAdminClient.createTable(createTableRequestBuilder.build());
CreateTableRequest createTableRequest = CreateTableRequest.of(tableName).addFamily(familyName);
tableAdminClient.createTable(createTableRequest);
}

void deleteTable(String tableId) {
final String tableName = bigtableOptions.getInstanceName().toTableNameStr(tableId);
DeleteTableRequest.Builder deleteTableRequestBuilder =
DeleteTableRequest.newBuilder().setName(tableName);
tableAdminClient.deleteTable(deleteTableRequestBuilder.build());
tableAdminClient.deleteTable(tableId);
}

void closeSession() throws IOException {
session.close();
dataClient.close();
tableAdminClient.close();
}
}
6 changes: 1 addition & 5 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ dependencies {
implementation library.java.google_auth_library_credentials
implementation library.java.google_auth_library_oauth2_http
implementation library.java.google_cloud_bigquery_storage
implementation(library.java.google_cloud_bigtable_client_core) {
exclude group: 'io.grpc', module: 'grpc-core' // Use Beam's version
exclude group: 'io.grpc', module: 'grpc-grpclb'
}
implementation(library.java.google_cloud_bigtable_client_core_config)
// google_cloud_bigtable_client_core declares old google-cloud-bigtable for
// Java7 compatibility. The old google-cloud-bigtable is not compatible with
// newer version of GAX. Declaring newer google-cloud-bigtable so that Beam
Expand Down Expand Up @@ -133,7 +130,6 @@ dependencies {
implementation library.java.netty_tcnative_boringssl_static
permitUnusedDeclared library.java.netty_tcnative_boringssl_static // BEAM-11761
implementation library.java.proto_google_cloud_bigquery_storage_v1
implementation library.java.proto_google_cloud_bigtable_admin_v2
implementation library.java.proto_google_cloud_bigtable_v2
implementation library.java.proto_google_cloud_datastore_v1
implementation library.java.proto_google_cloud_firestore_v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@

import com.google.auto.value.AutoValue;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.config.CredentialOptions;
import java.io.Serializable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
Expand All @@ -47,9 +46,6 @@ public abstract class BigtableConfig implements Serializable {
/** Returns the instance id being written to. */
public abstract @Nullable ValueProvider<String> getInstanceId();

/** Returns the table being read from. */
public abstract @Nullable ValueProvider<String> getTableId();

/** Returns the app profile being read from. */
public abstract @Nullable ValueProvider<String> getAppProfileId();

Expand All @@ -68,12 +64,21 @@ public abstract class BigtableConfig implements Serializable {
/** Weather validate that table exists before writing. */
abstract boolean getValidate();

/** {@link BigtableService} used only for testing. */
abstract @Nullable BigtableService getBigtableService();

/** Bigtable emulator. Used only for testing. */
/** Bigtable emulator. */
abstract @Nullable String getEmulatorHost();

/** User agent for this job. */
abstract @Nullable String getUserAgent();

/**
* Credentials for running the job. Use the default credentials in {@link GcpOptions} if it's not
* set.
*/
abstract @Nullable CredentialFactory getCredentialFactory();

/** Get number of channels. */
abstract @Nullable Integer getChannelCount();

abstract Builder toBuilder();

static BigtableConfig.Builder builder() {
Expand All @@ -87,23 +92,27 @@ abstract static class Builder {

abstract Builder setInstanceId(ValueProvider<String> instanceId);

abstract Builder setTableId(ValueProvider<String> tableId);

abstract Builder setAppProfileId(ValueProvider<String> appProfileId);

/** @deprecated will be replaced by bigtable options configurator. */
/** @deprecated please set the options directly in BigtableIO. */
@Deprecated
abstract Builder setBigtableOptions(BigtableOptions options);

abstract Builder setValidate(boolean validate);

/** @deprecated please set the options directly in BigtableIO. */
@Deprecated
abstract Builder setBigtableOptionsConfigurator(
SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> optionsConfigurator);

abstract Builder setBigtableService(BigtableService bigtableService);

abstract Builder setEmulatorHost(String emulatorHost);

abstract Builder setUserAgent(String userAgent);

abstract Builder setCredentialFactory(CredentialFactory credentialFactory);

abstract Builder setChannelCount(int count);

abstract BigtableConfig build();
}

Expand All @@ -117,23 +126,20 @@ public BigtableConfig withInstanceId(ValueProvider<String> instanceId) {
return toBuilder().setInstanceId(instanceId).build();
}

public BigtableConfig withTableId(ValueProvider<String> tableId) {
checkArgument(tableId != null, "tableId can not be null");
return toBuilder().setTableId(tableId).build();
}

public BigtableConfig withAppProfileId(ValueProvider<String> appProfileId) {
checkArgument(appProfileId != null, "tableId can not be null");
BigtableConfig withAppProfileId(ValueProvider<String> appProfileId) {
checkArgument(appProfileId != null, "App profile id can not be null");
return toBuilder().setAppProfileId(appProfileId).build();
}

/** @deprecated will be replaced by bigtable options configurator. */
/** @deprecated please set the options directly in BigtableIO. */
@Deprecated
public BigtableConfig withBigtableOptions(BigtableOptions options) {
checkArgument(options != null, "Bigtable options can not be null");
return toBuilder().setBigtableOptions(options).build();
}

/** @deprecated please set the options directly in BigtableIO. */
@Deprecated
public BigtableConfig withBigtableOptionsConfigurator(
SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> configurator) {
checkArgument(configurator != null, "configurator can not be null");
Expand All @@ -144,23 +150,13 @@ public BigtableConfig withValidate(boolean isEnabled) {
return toBuilder().setValidate(isEnabled).build();
}

@VisibleForTesting
public BigtableConfig withBigtableService(BigtableService bigtableService) {
checkArgument(bigtableService != null, "bigtableService can not be null");
return toBuilder().setBigtableService(bigtableService).build();
}

@VisibleForTesting
public BigtableConfig withEmulator(String emulatorHost) {
checkArgument(emulatorHost != null, "emulatorHost can not be null");
return toBuilder().setEmulatorHost(emulatorHost).build();
}

void validate() {
checkArgument(
getTableId() != null && (!getTableId().isAccessible() || !getTableId().get().isEmpty()),
"Could not obtain Bigtable table id");

checkArgument(
(getProjectId() != null
&& (!getProjectId().isAccessible() || !getProjectId().get().isEmpty()))
Expand All @@ -184,11 +180,9 @@ void populateDisplayData(DisplayData.Builder builder) {
DisplayData.item("projectId", getProjectId()).withLabel("Bigtable Project Id"))
.addIfNotNull(
DisplayData.item("instanceId", getInstanceId()).withLabel("Bigtable Instance Id"))
.addIfNotNull(DisplayData.item("tableId", getTableId()).withLabel("Bigtable Table Id"))
.addIfNotNull(
DisplayData.item("appProfileId", getAppProfileId())
.withLabel("Bigtable App Profile Id"))
.add(DisplayData.item("withValidation", getValidate()).withLabel("Check is table exists"));
.withLabel("Bigtable App Profile Id"));

if (getBigtableOptions() != null) {
builder.add(
Expand All @@ -197,81 +191,20 @@ void populateDisplayData(DisplayData.Builder builder) {
}
}

/**
* Helper function that either returns the mock Bigtable service supplied by {@link
* #withBigtableService} or creates and returns an implementation that talks to {@code Cloud
* Bigtable}.
*
* <p>Also populate the credentials option from {@link GcpOptions#getGcpCredential()} if the
* default credentials are being used on {@link BigtableOptions}.
*/
@VisibleForTesting
BigtableService getBigtableService(PipelineOptions pipelineOptions) {
if (getBigtableService() != null) {
return getBigtableService();
}

BigtableOptions.Builder bigtableOptions = effectiveUserProvidedBigtableOptions();

bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());

if (bigtableOptions.build().getCredentialOptions().getCredentialType()
== CredentialOptions.CredentialType.DefaultCredentials) {
bigtableOptions.setCredentialOptions(
CredentialOptions.credential(pipelineOptions.as(GcpOptions.class).getGcpCredential()));
}

return new BigtableServiceImpl(bigtableOptions.build());
}

boolean isDataAccessible() {
return getTableId().isAccessible()
&& (getProjectId() == null || getProjectId().isAccessible())
&& (getInstanceId() == null || getInstanceId().isAccessible());
}

private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
BigtableOptions.Builder effectiveOptions =
getBigtableOptions() != null
? getBigtableOptions().toBuilder()
: new BigtableOptions.Builder();

if (getBigtableOptionsConfigurator() != null) {
effectiveOptions = getBigtableOptionsConfigurator().apply(effectiveOptions);
}

// Default option that should be forced in most cases
effectiveOptions.setUseCachedDataPool(true);

if (getInstanceId() != null) {
effectiveOptions.setInstanceId(getInstanceId().get());
}

if (getProjectId() != null) {
effectiveOptions.setProjectId(getProjectId().get());
}

if (getEmulatorHost() != null) {
effectiveOptions.enableEmulator(getEmulatorHost());
effectiveOptions.setUseCachedDataPool(false);
}

return effectiveOptions;
return (getProjectId() == null || getProjectId().isAccessible())
&& (getInstanceId() == null || getInstanceId().isAccessible())
&& (getAppProfileId() == null || getAppProfileId().isAccessible());
}

@Override
public final String toString() {
return MoreObjects.toStringHelper(BigtableConfig.class)
.add("projectId", getProjectId())
.add("instanceId", getInstanceId())
.add("tableId", getTableId())
.add("appProfileId", getAppProfileId())
.add(
"bigtableOptionsConfigurator",
getBigtableOptionsConfigurator() == null
? null
: getBigtableOptionsConfigurator().getClass().getName())
.add("options", getBigtableOptions())
.add("userAgent", getUserAgent())
.add("emulator", getEmulatorHost())
.toString();
}
}
Loading