Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public Long create(PipelineOptions options) {
* <p>Beam will infer the secret type and value based on the secret itself. This guarantees that
* any data at rest during the performing a GBK, so this can be used to guarantee that data is not
* unencrypted. Runners with this behavior include the Dataflow, Flink, and Spark runners. The
* option should be structured like:
* secret should be a url safe base64 encoded 32 byte value. The option should be structured like:
*
* <pre><code>
* --gbek=type:<secret_type>;<secret_param>:<value>
Expand All @@ -432,14 +432,19 @@ public Long create(PipelineOptions options) {
* --gbek=type:GcpSecret;version_name:my_secret/versions/latest"
* </code></pre>
*
* All variables should use snake case to allow consistency across languages.
* All variables should use snake case to allow consistency across languages. For an example of
* generating a properly formatted secret, see
* https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java#L82
*/
@Description(
"When set, will replace all GroupByKey transforms in the pipeline the option. Beam will"
+ " infer the secret type and value based on the secret itself. This guarantees that"
+ " any data at rest during the performing a GBK, so this can be used to guarantee"
+ " that data is not unencrypted. Runners with this behavior include the Dataflow,"
+ " Flink, and Spark runners. The option should be structured like:"
+ " Flink, and Spark runners. The secret should be a url safe base64 encoded 32 byte"
+ " value. For an example of generating a properly formatted secret, see"
+ " https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java#L82"
+ " When passing in the gbek option, it should be structured like:"
+ " --gbek=type:<secret_type>;<secret_param>:<value>, for example "
+ " --gbek=type:GcpSecret;version_name:my_secret/versions/latest. All variables "
+ " should use snake case to allow consistency across languages.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
* the output. This is useful when the keys contain sensitive data that should not be stored at rest
* by the runner.
*
* <p>The transform requires a {@link Secret} which returns a 32 byte secret which can be used to
* generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm.
* <p>The transform requires a {@link Secret} which returns a base64 encoded 32 byte secret which
* can be used to generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm.
*
* <p>Note the following caveats: 1) Runners can implement arbitrary materialization steps, so this
* does not guarantee that the whole pipeline will not have unencrypted data at rest by itself. 2)
Expand Down Expand Up @@ -153,7 +153,7 @@ private static class EncryptMessage<K, V> extends DoFn<KV<K, V>, KV<byte[], KV<b
@Setup
public void setup() {
try {
byte[] secretBytes = this.hmacKey.getSecretBytes();
byte[] secretBytes = java.util.Base64.getUrlDecoder().decode(this.hmacKey.getSecretBytes());
this.mac = Mac.getInstance("HmacSHA256");
this.mac.init(new SecretKeySpec(secretBytes, "HmacSHA256"));
this.cipher = Cipher.getInstance("AES/GCM/NoPadding");
Expand Down Expand Up @@ -229,7 +229,9 @@ private static class DecryptMessage<K, V>
public void setup() {
try {
this.cipher = Cipher.getInstance("AES/GCM/NoPadding");
this.secretKeySpec = new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES");
this.secretKeySpec =
new SecretKeySpec(
java.util.Base64.getUrlDecoder().decode(this.hmacKey.getSecretBytes()), "AES");
} catch (Exception ex) {
throw new RuntimeException(
"Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class GroupByEncryptedKeyTest implements Serializable {

private static class FakeSecret implements Secret {
private final byte[] secret =
"aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZc".getBytes(Charset.defaultCharset());
"YUt3STJQbXFZRnQycDV0TktDeUJTNXFZV0hoSHNHWmM".getBytes(Charset.defaultCharset());

@Override
public byte[] getSecretBytes() {
Expand Down Expand Up @@ -123,7 +123,10 @@ public static void setup() throws IOException {
byte[] secretBytes = new byte[32];
new SecureRandom().nextBytes(secretBytes);
client.addSecretVersion(
secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build());
secretName,
SecretPayload.newBuilder()
.setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes)))
.build());
}
gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ public static void setup() throws IOException {
byte[] secretBytes = new byte[32];
new SecureRandom().nextBytes(secretBytes);
client.addSecretVersion(
secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build());
secretName,
SecretPayload.newBuilder()
.setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes)))
.build());
}
gcpSecretVersionName = secretName.toString() + "/versions/latest";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ public static void setup() throws IOException {
new SecureRandom().nextBytes(secretBytes);
client.addSecretVersion(
secretName,
SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build());
SecretPayload.newBuilder()
.setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes)))
.build());
}
gcpSecretVersionName = secretName.toString() + "/versions/latest";
}
Expand Down
8 changes: 6 additions & 2 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1726,8 +1726,12 @@ def _add_argparse_args(cls, parser):
'secret itself. This guarantees that any data at rest during the '
'GBK will be encrypted. Many runners only store data at rest when '
'performing a GBK, so this can be used to guarantee that data is '
'not unencrypted. Runners with this behavior include the '
'Dataflow, Flink, and Spark runners. The option should be '
'not unencrypted. The secret should be a url safe base64 encoded '
'32 byte value. To generate a secret in this format, you can use '
'Secret.generate_secret_bytes(). For an example of this, see '
'https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/python/apache_beam/transforms/util_test.py#L356. ' # pylint: disable=line-too-long
'Runners with this behavior include the Dataflow, '
'Flink, and Spark runners. The option should be '
'structured like: '
'--gbek=type:<secret_type>;<secret_param>:<value>, for example '
'--gbek=type:GcpSecret;version_name:my_secret/versions/latest'))
Expand Down
Loading