Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ sdks/python/LICENSE
sdks/python/NOTICE
sdks/python/README.md
sdks/python/apache_beam/portability/api/*pb2*.*
sdks/python/apache_beam/portability/common_urns.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to have a generated file? Can't you just reflectively and lazily generate the module/class/constants? (whichever is easiest)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that, but in that case we'd have to copy/distribute the .md file for pypi anyways (as most users won't be running this from within the github tree) so I figured this approach is easier. (Also has advantages for IDEs, and is similar to what we're doing for proto files and will want to do for Java.)


# Ignore IntelliJ files.
**/.idea/**/*
Expand Down
132 changes: 132 additions & 0 deletions model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
<!--

Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->

# Apache Beam URNs

This file serves as a central place to enumerate and document the various
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe YAML would allow easier parsing and machine-readable association of metadata and commentary with the URNs? This file already suggests a comment and payload field for general description and a machine-readable spec for the payload.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we made the comment and payload fields machine-readable, there's not much automated we could do with them. The greater need is to unify and document these urns, which is why I chose markdown (for easy human production and consumption).

URNs used in the Beam portability APIs.


## Core Transforms

### urn:beam:transform:pardo:v1

TODO(BEAM-3595): Change this to beam:transform:pardo:v1.

Represents Beam's parallel do operation.

Payload: A serialized ParDoPayload proto.

### beam:transform:group_by_key:v1

Represents Beam's group-by-key operation.

Payload: None

### beam:transform:window_into:v1

Payload: A windowing strategy id.

### beam:transform:flatten:v1

### beam:transform:read:v1


## Combining

If any of the combine operations are produced by an SDK, it is assumed that
the SDK understands the last three combine helper operations.

### beam:transform:combine_globally:v1

### beam:transform:combine_per_key:v1

### beam:transform:combine_grouped_values:v1

### beam:transform:combine_pgbkcv:v1

### beam:transform:combine_merge_accumulators:v1

### beam:transform:combine_extract_outputs:v1


## Other common transforms

### beam:transform:reshuffle:v1


## WindowFns

### beam:windowfn:global_windows:v0.1

TODO(BEAM-3595): Change this to beam:windowfn:global_windows:v1

### beam:windowfn:fixed_windows:v0.1

TODO(BEAM-3595): Change this to beam:windowfn:fixed_windows:v1

### beam:windowfn:sliding_windows:v0.1

TODO(BEAM-3595): Change this to beam:windowfn:sliding_windows:v1

### beam:windowfn:session_windows:v0.1

TODO(BEAM-3595): Change this to beam:windowfn:session_windows:v1


## Coders

### beam:coder:bytes:v1

Components: None

### beam:coder:varint:v1

Components: None

### beam:coder:kv:v1

Components: The key and value coder, in that order.

### beam:coder:iterable:v1

Encodes an iterable of elements.

Components: Coder for a single element.

## Internal coders

The following coders are typically not specified by manually by the user,
but are used at runtime and must be supported by every SDK.

### beam:coder:length_prefix:v1

### beam:coder:global_window:v1

### beam:coder:interval_window:v1

### beam:coder:windowed_value:v1


## Side input access

### beam:side_input:iterable:v1

### beam:side_input:multimap:v1

1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1950,6 +1950,7 @@
<include>**/sdks/python/NOTICE</include>
<include>**/sdks/python/README.md</include>
<include>**/sdks/python/apache_beam/portability/api/*pb2*.*</include>
<include>**/sdks/python/apache_beam/portability/common_urns.py</include>
<include>**/sdks/python/**/*.c</include>
<include>**/sdks/python/**/*.so</include>
<include>**/sdks/python/**/*.egg</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.beam.runners.core.construction;

import static org.apache.beam.runners.core.construction.UrnUtils.validateCommonUrn;

import com.google.auto.service.AutoService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BiMap;
Expand All @@ -41,14 +43,14 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
@VisibleForTesting
static final BiMap<Class<? extends Coder>, String> BEAM_MODEL_CODER_URNS =
ImmutableBiMap.<Class<? extends Coder>, String>builder()
.put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1")
.put(KvCoder.class, "urn:beam:coders:kv:0.1")
.put(VarLongCoder.class, "urn:beam:coders:varint:0.1")
.put(IntervalWindowCoder.class, "urn:beam:coders:interval_window:0.1")
.put(IterableCoder.class, "urn:beam:coders:stream:0.1")
.put(LengthPrefixCoder.class, "urn:beam:coders:length_prefix:0.1")
.put(GlobalWindow.Coder.class, "urn:beam:coders:global_window:0.1")
.put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1")
.put(ByteArrayCoder.class, validateCommonUrn("beam:coder:bytes:v1"))
.put(KvCoder.class, validateCommonUrn("beam:coder:kv:v1"))
.put(VarLongCoder.class, validateCommonUrn("beam:coder:varint:v1"))
.put(IntervalWindowCoder.class, validateCommonUrn("beam:coder:interval_window:v1"))
.put(IterableCoder.class, validateCommonUrn("beam:coder:iterable:v1"))
.put(LengthPrefixCoder.class, validateCommonUrn("beam:coder:length_prefix:v1"))
.put(GlobalWindow.Coder.class, validateCommonUrn("beam:coder:global_window:v1"))
.put(FullWindowedValueCoder.class, validateCommonUrn("beam:coder:windowed_value:v1"))
.build();

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.beam.runners.core.construction;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.runners.core.construction.UrnUtils.validateCommonUrn;

import com.google.auto.value.AutoValue;
import com.google.common.base.Joiner;
Expand Down Expand Up @@ -51,27 +52,34 @@
*/
public class PTransformTranslation {

public static final String PAR_DO_TRANSFORM_URN = "urn:beam:transform:pardo:v1";
public static final String FLATTEN_TRANSFORM_URN = "urn:beam:transform:flatten:v1";
public static final String GROUP_BY_KEY_TRANSFORM_URN = "urn:beam:transform:groupbykey:v1";
public static final String READ_TRANSFORM_URN = "urn:beam:transform:read:v1";
public static final String WINDOW_TRANSFORM_URN = "urn:beam:transform:window:v1";
public static final String PAR_DO_TRANSFORM_URN =
validateCommonUrn("urn:beam:transform:pardo:v1");
public static final String FLATTEN_TRANSFORM_URN =
validateCommonUrn("beam:transform:flatten:v1");
public static final String GROUP_BY_KEY_TRANSFORM_URN =
validateCommonUrn("beam:transform:group_by_key:v1");
public static final String READ_TRANSFORM_URN =
validateCommonUrn("beam:transform:read:v1");
public static final String WINDOW_TRANSFORM_URN =
validateCommonUrn("beam:transform:window_into:v1");
public static final String TEST_STREAM_TRANSFORM_URN = "urn:beam:transform:teststream:v1";

// Not strictly a primitive transform
public static final String COMBINE_TRANSFORM_URN = "urn:beam:transform:combine:v1";
public static final String COMBINE_TRANSFORM_URN =
validateCommonUrn("beam:transform:combine_per_key:v1");

public static final String RESHUFFLE_URN = "urn:beam:transform:reshuffle:v1";
public static final String RESHUFFLE_URN =
validateCommonUrn("beam:transform:reshuffle:v1");

// Less well-known. And where shall these live?
public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1";
public static final String WRITE_FILES_TRANSFORM_URN = "beam:transform:write_files:0.1";

/**
* @deprecated runners should move away from translating `CreatePCollectionView` and treat this as
* part of the translation for a `ParDo` side input.
*/
@Deprecated
public static final String CREATE_VIEW_TRANSFORM_URN = "urn:beam:transform:create_view:v1";
public static final String CREATE_VIEW_TRANSFORM_URN = "beam:transform:create_view:v1";

private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.core.construction;

import com.google.common.io.CharStreams;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/** Utilities for dealing with URNs. */
public class UrnUtils {

private static final String STANDARD_URNS_PATH = "/org/apache/beam/model/common_urns.md";
private static final Pattern URN_REGEX = Pattern.compile("\\b(urn:)?beam:\\S+:v[0-9.]+");
private static final Set<String> COMMON_URNS = extractUrnsFromPath(STANDARD_URNS_PATH);

private static Set<String> extractUrnsFromPath(String path) {
String contents;
try {
contents = CharStreams.toString(new InputStreamReader(
UrnUtils.class.getResourceAsStream(path)));
} catch (IOException exn) {
throw new RuntimeException(exn);
}
Set<String> urns = new HashSet<>();
Matcher m = URN_REGEX.matcher(contents);
while (m.find()) {
urns.add(m.group());
}
return urns;
}

public static String validateCommonUrn(String urn) {
if (!URN_REGEX.matcher(urn).matches()) {
throw new IllegalArgumentException(
String.format("'%s' does not match '%s'", urn, URN_REGEX));
}
if (!COMMON_URNS.contains(urn)) {
throw new IllegalArgumentException(
String.format("'%s' is not found in '%s'", urn, STANDARD_URNS_PATH));
}
return urn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime.
public static final String FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1";
public static final String SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1";
public static final String SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1";
static {
// Out-of-line to facilitate use in the case statements below.
UrnUtils.validateCommonUrn(GLOBAL_WINDOWS_FN);
UrnUtils.validateCommonUrn(FIXED_WINDOWS_FN);
UrnUtils.validateCommonUrn(SLIDING_WINDOWS_FN);
UrnUtils.validateCommonUrn(SESSION_WINDOWS_FN);
}
// This URN says that the WindowFn is just a UDF blob the Java SDK understands
// TODO: standardize such things
public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.core.construction;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import org.junit.Test;

/**
* Tests for UrnUtils.
*/
public class UrnUtilsTest {

private static final String GOOD_URN = "beam:coder:bytes:v1";
private static final String MISSING_URN = "beam:fake:v1";
private static final String BAD_URN = "Beam";

@Test
public void testGoodUrnSuccedes() {
assertEquals(GOOD_URN, UrnUtils.validateCommonUrn(GOOD_URN));
}

@Test
public void testMissingUrnFails() {
try {
UrnUtils.validateCommonUrn(MISSING_URN);
fail("Should have rejected " + MISSING_URN);
} catch (IllegalArgumentException exn) {
// expected
}
}

@Test
public void testBadUrnFails() {
try {
UrnUtils.validateCommonUrn(BAD_URN);
fail("Should have rejected " + BAD_URN);
} catch (IllegalArgumentException exn) {
// expected
}
}
}
Loading