diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 62022b219c2a..989e3a1e3193 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -420,7 +420,7 @@ public Long create(PipelineOptions options) { *

Beam will infer the secret type and value based on the secret itself. This guarantees that * any data at rest during the performing a GBK, so this can be used to guarantee that data is not * unencrypted. Runners with this behavior include the Dataflow, Flink, and Spark runners. The - * option should be structured like: + * secret should be a url safe base64 encoded 32 byte value. The option should be structured like: * *


    * --gbek=type:;:
@@ -432,14 +432,19 @@ public Long create(PipelineOptions options) {
    * --gbek=type:GcpSecret;version_name:my_secret/versions/latest"
    * 
* - * All variables should use snake case to allow consistency across languages. + * All variables should use snake case to allow consistency across languages. For an example of + * generating a properly formatted secret, see + * https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java#L82 */ @Description( "When set, will replace all GroupByKey transforms in the pipeline the option. Beam will" + " infer the secret type and value based on the secret itself. This guarantees that" + " any data at rest during the performing a GBK, so this can be used to guarantee" + " that data is not unencrypted. Runners with this behavior include the Dataflow," - + " Flink, and Spark runners. The option should be structured like:" + + " Flink, and Spark runners. The secret should be a url safe base64 encoded 32 byte" + + " value. For an example of generating a properly formatted secret, see" + + " https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java#L82" + + " When passing in the gbek option, it should be structured like:" + " --gbek=type:;:, for example " + " --gbek=type:GcpSecret;version_name:my_secret/versions/latest. All variables " + " should use snake case to allow consistency across languages.") diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 6ed0a31b3b95..1f4b7535d89e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -39,8 +39,8 @@ * the output. This is useful when the keys contain sensitive data that should not be stored at rest * by the runner. * - *

The transform requires a {@link Secret} which returns a 32 byte secret which can be used to - * generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm. + *

The transform requires a {@link Secret} which returns a base64 encoded 32 byte secret which + * can be used to generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm. * *

Note the following caveats: 1) Runners can implement arbitrary materialization steps, so this * does not guarantee that the whole pipeline will not have unencrypted data at rest by itself. 2) @@ -153,7 +153,7 @@ private static class EncryptMessage extends DoFn, KV public void setup() { try { this.cipher = Cipher.getInstance("AES/GCM/NoPadding"); - this.secretKeySpec = new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES"); + this.secretKeySpec = + new SecretKeySpec( + java.util.Base64.getUrlDecoder().decode(this.hmacKey.getSecretBytes()), "AES"); } catch (Exception ex) { throw new RuntimeException( "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index ba4c50e5a41e..3a2fc2f08c04 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -58,7 +58,7 @@ public class GroupByEncryptedKeyTest implements Serializable { private static class FakeSecret implements Secret { private final byte[] secret = - "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZc".getBytes(Charset.defaultCharset()); + "YUt3STJQbXFZRnQycDV0TktDeUJTNXFZV0hoSHNHWmM".getBytes(Charset.defaultCharset()); @Override public byte[] getSecretBytes() { @@ -123,7 +123,10 @@ public static void setup() throws IOException { byte[] secretBytes = new byte[32]; new SecureRandom().nextBytes(secretBytes); client.addSecretVersion( - secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + secretName, + SecretPayload.newBuilder() + .setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) + .build()); } gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest"); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index 60477a4c242f..141d6dae64b2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -82,7 +82,10 @@ public static void setup() throws IOException { byte[] secretBytes = new byte[32]; new SecureRandom().nextBytes(secretBytes); client.addSecretVersion( - secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + secretName, + SecretPayload.newBuilder() + .setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) + .build()); } gcpSecretVersionName = secretName.toString() + "/versions/latest"; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 326da99f1a81..d9a3e3ed20d4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -153,7 +153,9 @@ public static void setup() throws IOException { new SecureRandom().nextBytes(secretBytes); client.addSecretVersion( secretName, - SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + SecretPayload.newBuilder() + .setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) + .build()); } gcpSecretVersionName = secretName.toString() + "/versions/latest"; } diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 2d3b8b49d8d7..3fc5151156f1 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1726,8 +1726,12 @@ def _add_argparse_args(cls, parser): 'secret itself. This guarantees that any data at rest during the ' 'GBK will be encrypted. Many runners only store data at rest when ' 'performing a GBK, so this can be used to guarantee that data is ' - 'not unencrypted. Runners with this behavior include the ' - 'Dataflow, Flink, and Spark runners. The option should be ' + 'not unencrypted. The secret should be a url safe base64 encoded ' + '32 byte value. To generate a secret in this format, you can use ' + 'Secret.generate_secret_bytes(). For an example of this, see ' + 'https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/python/apache_beam/transforms/util_test.py#L356. ' # pylint: disable=line-too-long + 'Runners with this behavior include the Dataflow, ' + 'Flink, and Spark runners. The option should be ' 'structured like: ' '--gbek=type:;:, for example ' '--gbek=type:GcpSecret;version_name:my_secret/versions/latest'))