From 5a5e51ea5fc7467dca90b360c95bdf8be7e05e80 Mon Sep 17 00:00:00 2001 From: Heejong Lee Date: Thu, 5 May 2022 20:26:31 -0700 Subject: [PATCH 1/7] [BEAM-14430] Adding a logical type support for Python callables to Row schema --- .../beam/sdk/schemas/SchemaTranslation.java | 2 + .../schemas/logicaltypes/PythonCallable.java | 56 +++++++++++++++++++ .../schemas/utils/StaticSchemaInference.java | 1 + .../beam/sdk/util/PythonCallableSource.java | 40 +++++++++++++ .../sdk/schemas/SchemaTranslationTest.java | 2 + .../python/PythonExternalTransform.java | 31 +++++++++- ....java => PythonExternalTransformTest.java} | 30 +++++++++- sdks/python/apache_beam/typehints/schemas.py | 25 +++++++++ .../apache_beam/utils/python_callable.py | 41 ++++++++++++++ 9 files changed, 225 insertions(+), 3 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/PythonCallableSource.java rename sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/{ExternalPythonTransformTest.java => PythonExternalTransformTest.java} (89%) create mode 100644 sdks/python/apache_beam/utils/python_callable.py diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java index e1112b2472de..9a63c2d87819 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable; import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.UnknownLogicalType; import org.apache.beam.sdk.util.SerializableUtils; @@ -74,6 +75,7 @@ public class SchemaTranslation { ImmutableMap.>>builder() .put(MicrosInstant.IDENTIFIER, MicrosInstant.class) .put(SchemaLogicalType.IDENTIFIER, SchemaLogicalType.class) + .put(PythonCallable.IDENTIFIER, PythonCallable.class) .build(); public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java new file mode 100644 index 000000000000..6bd43cb8ba89 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.LogicalType; +import org.apache.beam.sdk.util.PythonCallableSource; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** A logical type for PythonCallableSource objects. */ +@Experimental(Experimental.Kind.SCHEMAS) +public class PythonCallable implements LogicalType { + public static final String IDENTIFIER = "beam:logical_type:python_callable:v1"; + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public Schema.@Nullable FieldType getArgumentType() { + return null; + } + + @Override + public Schema.FieldType getBaseType() { + return Schema.FieldType.STRING; + } + + @Override + public @NonNull String toBaseType(@NonNull PythonCallableSource input) { + return input.getPythonCallableCode(); + } + + @Override + public @NonNull PythonCallableSource toInputType(@NonNull String base) { + return PythonCallableSource.of(base); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java index 103405037bed..a1437c2d0ccd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java @@ -126,6 +126,7 @@ public static Schema.FieldType fieldFromType( return fieldFromType(type, fieldValueTypeSupplier, new HashMap()); } + // TODO(BEAM-14458): support type inference for logical types private static Schema.FieldType fieldFromType( TypeDescriptor type, FieldValueTypeSupplier fieldValueTypeSupplier, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PythonCallableSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PythonCallableSource.java new file mode 100644 index 000000000000..8875d8982963 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PythonCallableSource.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.Serializable; + +/** + * A wrapper object storing a Python code that can be evaluated to Python callables in Python SDK. + */ +public class PythonCallableSource implements Serializable { + private final String pythonCallableCode; + + private PythonCallableSource(String pythonCallableCode) { + this.pythonCallableCode = pythonCallableCode; + } + + public static PythonCallableSource of(String pythonCallableCode) { + // TODO(BEAM-14457): check syntactic correctness of Python code if possible + return new PythonCallableSource(pythonCallableCode); + } + + public String getPythonCallableCode() { + return pythonCallableCode; + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java index f4274de02ea5..9f3f7004e8c1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.DateTime; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable; import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString; @@ -132,6 +133,7 @@ public static Iterable data() { Field.of("decimal", FieldType.DECIMAL), Field.of("datetime", FieldType.DATETIME))) .add(Schema.of(Field.of("fixed_bytes", FieldType.logicalType(FixedBytes.of(24))))) .add(Schema.of(Field.of("micros_instant", FieldType.logicalType(new MicrosInstant())))) + .add(Schema.of(Field.of("python_callable", FieldType.logicalType(new PythonCallable())))) .add( Schema.of( Field.of("field_with_option_atomic", FieldType.STRING) diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java index c412acd220ee..30f72429e5e9 100644 --- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java +++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.python; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.SortedMap; @@ -64,6 +65,7 @@ public class PythonExternalTransform kwargsMap; + private Map, Schema.FieldType> typeHints; private @Nullable Object @NonNull [] argsArray; private @Nullable Row providedKwargsRow; @@ -72,6 +74,7 @@ private PythonExternalTransform(String fullyQualifiedName, String expansionServi this.fullyQualifiedName = fullyQualifiedName; this.expansionService = expansionService; this.kwargsMap = new TreeMap<>(); + this.typeHints = new HashMap<>(); argsArray = new Object[] {}; } @@ -162,6 +165,26 @@ public PythonExternalTransform withKwargs(Row kwargs) { return this; } + /** + * Specifies the field type of arguments. + * + *

Type hints are especially useful for logical types since type inference does not work well + * for logical types. + * + * @param argType A class object for the argument type. + * @param fieldType A schema field type for the argument. + * @return updated wrapper for the cross-language transform. + */ + public PythonExternalTransform withTypeHint( + java.lang.Class argType, Schema.FieldType fieldType) { + if (typeHints.containsKey(argType)) { + throw new IllegalArgumentException( + String.format("typehint for arg type %s already exists", argType)); + } + typeHints.put(argType, fieldType); + return this; + } + @VisibleForTesting Row buildOrGetKwargsRow() { if (providedKwargsRow != null) { @@ -180,15 +203,17 @@ Row buildOrGetKwargsRow() { // * Java primitives // * Type String // * Type Row - private static boolean isCustomType(java.lang.Class type) { + // * Any Type explicitly annotated by withTypeHint() + private boolean isCustomType(java.lang.Class type) { boolean val = !(ClassUtils.isPrimitiveOrWrapper(type) || type == String.class + || typeHints.containsKey(type) || Row.class.isAssignableFrom(type)); return val; } - // If the custom type has a registered schema, we use that. OTherwise we try to register it using + // If the custom type has a registered schema, we use that. Otherwise, we try to register it using // 'JavaFieldSchema'. private Row convertCustomValue(Object value) { SerializableFunction toRowFunc; @@ -239,6 +264,8 @@ private Schema generateSchemaDirectly( if (field instanceof Row) { // Rows are used as is but other types are converted to proper field types. builder.addRowField(fieldName, ((Row) field).getSchema()); + } else if (typeHints.containsKey(field.getClass())) { + builder.addField(fieldName, typeHints.get(field.getClass())); } else { builder.addField( fieldName, diff --git a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java similarity index 89% rename from sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java rename to sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java index 60deebfc6e66..cfe7428ba2e5 100644 --- a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java +++ b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java @@ -27,9 +27,11 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.util.PythonCallableSource; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -41,7 +43,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class ExternalPythonTransformTest implements Serializable { +public class PythonExternalTransformTest implements Serializable { @Ignore("BEAM-14148") @Test public void trivialPythonTransform() { @@ -184,6 +186,19 @@ public void generateArgsWithCustomType() { assertEquals(456, (int) receivedRow.getRow("field1").getInt32("intField")); } + @Test + public void generateArgsWithTypeHint() { + PythonExternalTransform transform = + PythonExternalTransform + .>, PCollection>>>from( + "DummyTransform") + .withArgs(PythonCallableSource.of("dummy data")) + .withTypeHint( + PythonCallableSource.class, Schema.FieldType.logicalType(new PythonCallable())); + Row receivedRow = transform.buildOrGetArgsRow(); + assertTrue(receivedRow.getValue("field0") instanceof PythonCallableSource); + } + @Test public void generateKwargsEmpty() { PythonExternalTransform transform = @@ -274,6 +289,19 @@ public void generateKwargsWithCustomType() { assertEquals(456, (int) receivedRow.getRow("customField1").getInt32("intField")); } + @Test + public void generateKwargsWithTypeHint() { + PythonExternalTransform transform = + PythonExternalTransform + .>, PCollection>>>from( + "DummyTransform") + .withKwarg("customField0", PythonCallableSource.of("dummy data")) + .withTypeHint( + PythonCallableSource.class, Schema.FieldType.logicalType(new PythonCallable())); + Row receivedRow = transform.buildOrGetKwargsRow(); + assertTrue(receivedRow.getValue("customField0") instanceof PythonCallableSource); + } + @Test public void generateKwargsFromMap() { Map kwargsMap = diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 5a04ba51722b..74feb1466343 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -78,6 +78,7 @@ from apache_beam.typehints.native_type_compatibility import extract_optional_type from apache_beam.typehints.native_type_compatibility import match_is_named_tuple from apache_beam.utils import proto_utils +from apache_beam.utils.python_callable import PythonCallableWithSource from apache_beam.utils.timestamp import Timestamp PYTHON_ANY_URN = "beam:logical:pythonsdk_any:v1" @@ -559,3 +560,27 @@ def to_representation_type(self, value): def to_language_type(self, value): # type: (MicrosInstantRepresentation) -> Timestamp return Timestamp(seconds=int(value.seconds), micros=int(value.micros)) + + +@LogicalType.register_logical_type +class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]): + @classmethod + def urn(cls): + return "beam:logical_type:python_callable:v1" + + @classmethod + def representation_type(cls): + # type: () -> type + return str + + @classmethod + def language_type(cls): + return PythonCallableWithSource + + def to_representation_type(self, value): + # type: (PythonCallableWithSource) -> str + return value.get_source() + + def to_language_type(self, value): + # type: (str) -> PythonCallableWithSource + return PythonCallableWithSource(value) diff --git a/sdks/python/apache_beam/utils/python_callable.py b/sdks/python/apache_beam/utils/python_callable.py new file mode 100644 index 000000000000..9238e4de66ba --- /dev/null +++ b/sdks/python/apache_beam/utils/python_callable.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Python Callable utilities. + +For internal use only; no backwards-compatibility guarantees. +""" + + +class PythonCallableWithSource(object): + """Represents a Python callable object with source codes before evaluated. + + Proxy object to Store a callable object with its string form (source code). + The string form is used when the object is encoded and transferred to foreign + SDKs (non-Python SDKs). + """ + def __init__(self, source): + # type: (str) -> None + self._source = source + self._callable = eval(source) # pylint: disable=eval-used + + def get_source(self): + # type: () -> str + return self._source + + def __call__(self, *args, **kwargs): + return self._callable(*args, **kwargs) From 6797787df5bce9f73fe564aebcf8761e8666cca9 Mon Sep 17 00:00:00 2001 From: Heejong Lee Date: Wed, 11 May 2022 20:34:20 -0700 Subject: [PATCH 2/7] add urn, type inference for PythonCallableSource --- .../model/pipeline/v1/beam_runner_api.proto | 7 ++++ .../schemas/logicaltypes/PythonCallable.java | 8 +++- .../python/PythonExternalTransform.java | 8 +++- .../python/PythonExternalTransformTest.java | 39 ++++++++++++++----- .../apache_beam/portability/common_urns.py | 3 ++ sdks/python/apache_beam/typehints/schemas.py | 3 +- .../apache_beam/typehints/schemas_test.py | 17 ++++++++ 7 files changed, 73 insertions(+), 12 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto index 54d328cce82c..3974a4f48627 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto @@ -1094,6 +1094,13 @@ message StandardCoders { } } +message LogicalTypes { + enum Enum { + // A URN for Python Callable logical type + PYTHON_CALLABLE = 0 [(beam_urn) = "beam:logical_type:python_callable:v1"]; + } +} + // A windowing strategy describes the window function, triggering, allowed // lateness, and accumulation mode for a PCollection. // diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java index 6bd43cb8ba89..ac0d918e7bc3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.schemas.logicaltypes; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.LogicalTypes; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.LogicalType; @@ -27,7 +29,11 @@ /** A logical type for PythonCallableSource objects. */ @Experimental(Experimental.Kind.SCHEMAS) public class PythonCallable implements LogicalType { - public static final String IDENTIFIER = "beam:logical_type:python_callable:v1"; + public static final String IDENTIFIER = + LogicalTypes.Enum.PYTHON_CALLABLE + .getValueDescriptor() + .getOptions() + .getExtension(RunnerApi.beamUrn); @Override public String getIdentifier() { diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java index 30f72429e5e9..b76ca21e5652 100644 --- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java +++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java @@ -34,10 +34,12 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable; import org.apache.beam.sdk.schemas.utils.StaticSchemaInference; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.PythonCallableSource; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -202,12 +204,14 @@ Row buildOrGetKwargsRow() { // Types that are not one of following are considered custom types. // * Java primitives // * Type String - // * Type Row + // * Type PythonCallableSource // * Any Type explicitly annotated by withTypeHint() + // * Type Row private boolean isCustomType(java.lang.Class type) { boolean val = !(ClassUtils.isPrimitiveOrWrapper(type) || type == String.class + || type == PythonCallableSource.class || typeHints.containsKey(type) || Row.class.isAssignableFrom(type)); return val; @@ -264,6 +268,8 @@ private Schema generateSchemaDirectly( if (field instanceof Row) { // Rows are used as is but other types are converted to proper field types. builder.addRowField(fieldName, ((Row) field).getSchema()); + } else if (field instanceof PythonCallableSource) { + builder.addField(fieldName, Schema.FieldType.logicalType(new PythonCallable())); } else if (typeHints.containsKey(field.getClass())) { builder.addField(fieldName, typeHints.get(field.getClass())); } else { diff --git a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java index cfe7428ba2e5..5d55a6e8d345 100644 --- a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java +++ b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java @@ -22,12 +22,13 @@ import static org.junit.Assert.assertTrue; import java.io.Serializable; +import java.time.Instant; import java.util.Map; import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaTranslation; -import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable; +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; @@ -187,18 +188,28 @@ public void generateArgsWithCustomType() { } @Test - public void generateArgsWithTypeHint() { + public void generateArgsWithPythonCallableSource() { PythonExternalTransform transform = PythonExternalTransform .>, PCollection>>>from( "DummyTransform") - .withArgs(PythonCallableSource.of("dummy data")) - .withTypeHint( - PythonCallableSource.class, Schema.FieldType.logicalType(new PythonCallable())); + .withArgs(PythonCallableSource.of("dummy data")); Row receivedRow = transform.buildOrGetArgsRow(); assertTrue(receivedRow.getValue("field0") instanceof PythonCallableSource); } + @Test + public void generateArgsWithTypeHint() { + PythonExternalTransform transform = + PythonExternalTransform + .>, PCollection>>>from( + "DummyTransform") + .withArgs(Instant.ofEpochSecond(0)) + .withTypeHint(Instant.class, Schema.FieldType.logicalType(new MicrosInstant())); + Row receivedRow = transform.buildOrGetArgsRow(); + assertTrue(receivedRow.getValue("field0") instanceof Instant); + } + @Test public void generateKwargsEmpty() { PythonExternalTransform transform = @@ -290,18 +301,28 @@ public void generateKwargsWithCustomType() { } @Test - public void generateKwargsWithTypeHint() { + public void generateKwargsWithPythonCallableSource() { PythonExternalTransform transform = PythonExternalTransform .>, PCollection>>>from( "DummyTransform") - .withKwarg("customField0", PythonCallableSource.of("dummy data")) - .withTypeHint( - PythonCallableSource.class, Schema.FieldType.logicalType(new PythonCallable())); + .withKwarg("customField0", PythonCallableSource.of("dummy data")); Row receivedRow = transform.buildOrGetKwargsRow(); assertTrue(receivedRow.getValue("customField0") instanceof PythonCallableSource); } + @Test + public void generateKwargsWithTypeHint() { + PythonExternalTransform transform = + PythonExternalTransform + .>, PCollection>>>from( + "DummyTransform") + .withKwarg("customField0", Instant.ofEpochSecond(0)) + .withTypeHint(Instant.class, Schema.FieldType.logicalType(new MicrosInstant())); + Row receivedRow = transform.buildOrGetKwargsRow(); + assertTrue(receivedRow.getValue("customField0") instanceof Instant); + } + @Test public void generateKwargsFromMap() { Map kwargsMap = diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index daf54ea04da3..ad6b74b145eb 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -25,6 +25,7 @@ from .api import standard_window_fns_pb2_urns BeamConstants = beam_runner_api_pb2_urns.BeamConstants +LogicalTypes = beam_runner_api_pb2_urns.LogicalTypes StandardArtifacts = beam_runner_api_pb2_urns.StandardArtifacts StandardCoders = beam_runner_api_pb2_urns.StandardCoders StandardDisplayData = beam_runner_api_pb2_urns.StandardDisplayData @@ -76,3 +77,5 @@ displayData = StandardDisplayData.DisplayData java_class_lookup = ExpansionMethods.Enum.JAVA_CLASS_LOOKUP + +python_callable = LogicalTypes.Enum.PYTHON_CALLABLE \ No newline at end of file diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 74feb1466343..71bf64b63504 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -69,6 +69,7 @@ import numpy as np from google.protobuf import text_format +from apache_beam.portability import common_urns from apache_beam.portability.api import schema_pb2 from apache_beam.typehints import row_type from apache_beam.typehints.native_type_compatibility import _get_args @@ -566,7 +567,7 @@ def to_language_type(self, value): class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]): @classmethod def urn(cls): - return "beam:logical_type:python_callable:v1" + return common_urns.python_callable @classmethod def representation_type(cls): diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 834edf18777e..6de4b0f3efe5 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -31,6 +31,7 @@ import numpy as np +from apache_beam.portability import common_urns from apache_beam.portability.api import schema_pb2 from apache_beam.typehints.native_type_compatibility import match_is_named_tuple from apache_beam.typehints.schemas import SchemaTypeRegistry @@ -239,6 +240,22 @@ def test_float_maps_to_float64(self): schema_pb2.FieldType(atomic_type=schema_pb2.DOUBLE), typing_to_runner_api(float)) + def test_python_callable_maps_to_logical_type(self): + from apache_beam.utils.python_callable import PythonCallableWithSource + self.assertEqual( + schema_pb2.FieldType( + logical_type=schema_pb2.LogicalType( + urn=common_urns.python_callable, + representation=typing_to_runner_api(str))), + typing_to_runner_api(PythonCallableWithSource)) + self.assertEqual( + typing_from_runner_api( + schema_pb2.FieldType( + logical_type=schema_pb2.LogicalType( + urn=common_urns.python_callable, + representation=typing_to_runner_api(str)))), + PythonCallableWithSource) + def test_trivial_example(self): MyCuteClass = NamedTuple( 'MyCuteClass', From e2b2966281259b361e5601af5b8d718a548f7770 Mon Sep 17 00:00:00 2001 From: Heejong Lee Date: Wed, 11 May 2022 22:29:12 -0700 Subject: [PATCH 3/7] fix lint errors --- sdks/python/apache_beam/portability/common_urns.py | 2 +- sdks/python/apache_beam/typehints/schemas.py | 2 +- sdks/python/apache_beam/typehints/schemas_test.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index ad6b74b145eb..1cda8e832626 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -78,4 +78,4 @@ java_class_lookup = ExpansionMethods.Enum.JAVA_CLASS_LOOKUP -python_callable = LogicalTypes.Enum.PYTHON_CALLABLE \ No newline at end of file +python_callable = LogicalTypes.Enum.PYTHON_CALLABLE diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 71bf64b63504..e34b30af7913 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -567,7 +567,7 @@ def to_language_type(self, value): class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]): @classmethod def urn(cls): - return common_urns.python_callable + return common_urns.python_callable.urn @classmethod def representation_type(cls): diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 6de4b0f3efe5..404d9c5583c3 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -245,14 +245,14 @@ def test_python_callable_maps_to_logical_type(self): self.assertEqual( schema_pb2.FieldType( logical_type=schema_pb2.LogicalType( - urn=common_urns.python_callable, + urn=common_urns.python_callable.urn, representation=typing_to_runner_api(str))), typing_to_runner_api(PythonCallableWithSource)) self.assertEqual( typing_from_runner_api( schema_pb2.FieldType( logical_type=schema_pb2.LogicalType( - urn=common_urns.python_callable, + urn=common_urns.python_callable.urn, representation=typing_to_runner_api(str)))), PythonCallableWithSource) From 792baa2645909571ad0e86fe17a2081aaa690d12 Mon Sep 17 00:00:00 2001 From: Heejong Lee Date: Thu, 12 May 2022 14:46:24 -0700 Subject: [PATCH 4/7] move logical types def --- .../beam/model/pipeline/v1/beam_runner_api.proto | 7 ------- .../apache/beam/model/pipeline/v1/schema.proto | 16 ++++++++++++++++ .../sdk/schemas/logicaltypes/PythonCallable.java | 4 ++-- .../apache_beam/portability/common_urns.py | 3 ++- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto index 3974a4f48627..54d328cce82c 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto @@ -1094,13 +1094,6 @@ message StandardCoders { } } -message LogicalTypes { - enum Enum { - // A URN for Python Callable logical type - PYTHON_CALLABLE = 0 [(beam_urn) = "beam:logical_type:python_callable:v1"]; - } -} - // A windowing strategy describes the window function, triggering, allowed // lateness, and accumulation mode for a PCollection. // diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto index b26fc8fef8d6..af3c72ebaf8c 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto @@ -31,6 +31,8 @@ option go_package = "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v option java_package = "org.apache.beam.model.pipeline.v1"; option java_outer_classname = "SchemaApi"; +import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto"; + message Schema { // List of fields for this schema. Two fields may not share a name. repeated Field fields = 1; @@ -110,6 +112,20 @@ message LogicalType { FieldValue argument = 5; } +// Universally defined Logical types for Row schemas. +// These logical types are supposed to be understood by all SDKs. +message LogicalTypes { + enum Enum { + // A URN for Python Callable logical type + // - Representation type: STRING + // - Language type: In Python SDK, PythonCallableWithSource. + // In any other SDKs, a wrapper object for a string which + // can be evaluated to a Python Callable object. + PYTHON_CALLABLE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:logical_type:python_callable:v1"]; + } +} + message Option { // REQUIRED. Identifier for the option. string name = 1; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java index ac0d918e7bc3..ea4e297515e9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.schemas.logicaltypes; import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.model.pipeline.v1.RunnerApi.LogicalTypes; +import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.LogicalType; @@ -30,7 +30,7 @@ @Experimental(Experimental.Kind.SCHEMAS) public class PythonCallable implements LogicalType { public static final String IDENTIFIER = - LogicalTypes.Enum.PYTHON_CALLABLE + SchemaApi.LogicalTypes.Enum.PYTHON_CALLABLE .getValueDescriptor() .getOptions() .getExtension(RunnerApi.beamUrn); diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index 1cda8e832626..6a9ff62a5e81 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -22,10 +22,10 @@ from .api import beam_runner_api_pb2_urns from .api import external_transforms_pb2_urns from .api import metrics_pb2_urns +from .api import schema_pb2_urns from .api import standard_window_fns_pb2_urns BeamConstants = beam_runner_api_pb2_urns.BeamConstants -LogicalTypes = beam_runner_api_pb2_urns.LogicalTypes StandardArtifacts = beam_runner_api_pb2_urns.StandardArtifacts StandardCoders = beam_runner_api_pb2_urns.StandardCoders StandardDisplayData = beam_runner_api_pb2_urns.StandardDisplayData @@ -40,6 +40,7 @@ MonitoringInfo = metrics_pb2_urns.MonitoringInfo MonitoringInfoSpecs = metrics_pb2_urns.MonitoringInfoSpecs MonitoringInfoTypeUrns = metrics_pb2_urns.MonitoringInfoTypeUrns +LogicalTypes = schema_pb2_urns.LogicalTypes FixedWindowsPayload = standard_window_fns_pb2_urns.FixedWindowsPayload GlobalWindowsPayload = standard_window_fns_pb2_urns.GlobalWindowsPayload SessionWindowsPayload = standard_window_fns_pb2_urns.SessionWindowsPayload From 514e325845cf125448b1e38191db6746f092f24c Mon Sep 17 00:00:00 2001 From: Heejong Lee Date: Thu, 12 May 2022 15:10:50 -0700 Subject: [PATCH 5/7] add micros_instant urn --- .../org/apache/beam/model/pipeline/v1/schema.proto | 11 +++++++++-- .../beam/sdk/schemas/logicaltypes/MicrosInstant.java | 8 +++++++- sdks/python/apache_beam/portability/common_urns.py | 1 + sdks/python/apache_beam/typehints/schemas.py | 2 +- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto index af3c72ebaf8c..3a6a79a6e2ea 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto @@ -117,12 +117,19 @@ message LogicalType { message LogicalTypes { enum Enum { // A URN for Python Callable logical type - // - Representation type: STRING - // - Language type: In Python SDK, PythonCallableWithSource. + // - Representation type: STRING + // - Language type: In Python SDK, PythonCallableWithSource. // In any other SDKs, a wrapper object for a string which // can be evaluated to a Python Callable object. PYTHON_CALLABLE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:logical_type:python_callable:v1"]; + + // A URN for MicrosInstant type + // - Representation type: ROW + // - A timestamp without a timezone where seconds + micros represents the + // amount of time since the epoch. + MICROS_INSTANT = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:logical_type:micros_instant:v1"]; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/MicrosInstant.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/MicrosInstant.java index 6c1fea85d842..a388731a14c5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/MicrosInstant.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/MicrosInstant.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.schemas.logicaltypes; import java.time.Instant; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.Row; @@ -36,7 +38,11 @@ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) public class MicrosInstant implements Schema.LogicalType { - public static final String IDENTIFIER = "beam:logical_type:micros_instant:v1"; + public static final String IDENTIFIER = + SchemaApi.LogicalTypes.Enum.MICROS_INSTANT + .getValueDescriptor() + .getOptions() + .getExtension(RunnerApi.beamUrn); // TODO(BEAM-10878): This should be a constant private final Schema schema; diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index 6a9ff62a5e81..5e8a3ce4cce1 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -79,4 +79,5 @@ java_class_lookup = ExpansionMethods.Enum.JAVA_CLASS_LOOKUP +micros_instant = LogicalTypes.Enum.MICROS_INSTANT python_callable = LogicalTypes.Enum.PYTHON_CALLABLE diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index e34b30af7913..02eac46ae5d6 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -542,7 +542,7 @@ class MicrosInstant(NoArgumentLogicalType[Timestamp, MicrosInstantRepresentation]): @classmethod def urn(cls): - return "beam:logical_type:micros_instant:v1" + return common_urns.micros_instant.urn @classmethod def representation_type(cls): From 2fce769df1f45ac250e2ffd11a60f97a7499ead2 Mon Sep 17 00:00:00 2001 From: Heejong Lee Date: Thu, 12 May 2022 16:12:46 -0700 Subject: [PATCH 6/7] put a default type hint for PythonCallableSource --- .../sdk/extensions/python/PythonExternalTransform.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java index b76ca21e5652..198a32c138ef 100644 --- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java +++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java @@ -77,6 +77,10 @@ private PythonExternalTransform(String fullyQualifiedName, String expansionServi this.expansionService = expansionService; this.kwargsMap = new TreeMap<>(); this.typeHints = new HashMap<>(); + // TODO(BEAM-14458): remove a default type hint for PythonCallableSource when BEAM-14458 is + // resolved + this.typeHints.put( + PythonCallableSource.class, Schema.FieldType.logicalType(new PythonCallable())); argsArray = new Object[] {}; } @@ -204,14 +208,12 @@ Row buildOrGetKwargsRow() { // Types that are not one of following are considered custom types. // * Java primitives // * Type String - // * Type PythonCallableSource // * Any Type explicitly annotated by withTypeHint() // * Type Row private boolean isCustomType(java.lang.Class type) { boolean val = !(ClassUtils.isPrimitiveOrWrapper(type) || type == String.class - || type == PythonCallableSource.class || typeHints.containsKey(type) || Row.class.isAssignableFrom(type)); return val; @@ -268,8 +270,6 @@ private Schema generateSchemaDirectly( if (field instanceof Row) { // Rows are used as is but other types are converted to proper field types. builder.addRowField(fieldName, ((Row) field).getSchema()); - } else if (field instanceof PythonCallableSource) { - builder.addField(fieldName, Schema.FieldType.logicalType(new PythonCallable())); } else if (typeHints.containsKey(field.getClass())) { builder.addField(fieldName, typeHints.get(field.getClass())); } else { From 2d36feb2b6a07e71ab947846f54bd0d63857c360 Mon Sep 17 00:00:00 2001 From: Heejong Lee Date: Thu, 12 May 2022 18:02:09 -0700 Subject: [PATCH 7/7] add comment --- .../apache/beam/model/pipeline/v1/beam_runner_api.proto | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto index 54d328cce82c..a19229a7857f 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto @@ -1046,12 +1046,8 @@ message StandardCoders { // Nullable types in container types (ArrayType, MapType) per the // encoding described for general Nullable types below. // - // Well known logical types: - // beam:logical_type:micros_instant:v1 - // - Representation type: ROW - // - A timestamp without a timezone where seconds + micros represents the - // amount of time since the epoch. - // + // Logical types understood by all SDKs should be defined in schema.proto. + // Example of well known logical types: // beam:logical_type:schema:v1 // - Representation type: BYTES // - A Beam Schema stored as a serialized proto.