-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-14430] Adding a logical type support for Python callables to Row schema #17608
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5a5e51e
6797787
e2b2966
792baa2
514e325
2fce769
2d36feb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.schemas.logicaltypes; | ||
|
|
||
| import org.apache.beam.model.pipeline.v1.RunnerApi; | ||
| import org.apache.beam.model.pipeline.v1.SchemaApi; | ||
| import org.apache.beam.sdk.annotations.Experimental; | ||
| import org.apache.beam.sdk.schemas.Schema; | ||
| import org.apache.beam.sdk.schemas.Schema.LogicalType; | ||
| import org.apache.beam.sdk.util.PythonCallableSource; | ||
| import org.checkerframework.checker.nullness.qual.NonNull; | ||
| import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
|
||
| /** A logical type for PythonCallableSource objects. */ | ||
| @Experimental(Experimental.Kind.SCHEMAS) | ||
| public class PythonCallable implements LogicalType<PythonCallableSource, String> { | ||
| public static final String IDENTIFIER = | ||
| SchemaApi.LogicalTypes.Enum.PYTHON_CALLABLE | ||
| .getValueDescriptor() | ||
| .getOptions() | ||
| .getExtension(RunnerApi.beamUrn); | ||
|
|
||
| @Override | ||
| public String getIdentifier() { | ||
| return IDENTIFIER; | ||
| } | ||
|
|
||
| @Override | ||
| public Schema.@Nullable FieldType getArgumentType() { | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public Schema.FieldType getBaseType() { | ||
| return Schema.FieldType.STRING; | ||
| } | ||
|
|
||
| @Override | ||
| public @NonNull String toBaseType(@NonNull PythonCallableSource input) { | ||
| return input.getPythonCallableCode(); | ||
| } | ||
|
|
||
| @Override | ||
| public @NonNull PythonCallableSource toInputType(@NonNull String base) { | ||
| return PythonCallableSource.of(base); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.util; | ||
|
|
||
| import java.io.Serializable; | ||
|
|
||
| /** | ||
| * A wrapper object storing a Python code that can be evaluated to Python callables in Python SDK. | ||
| */ | ||
| public class PythonCallableSource implements Serializable { | ||
| private final String pythonCallableCode; | ||
|
|
||
| private PythonCallableSource(String pythonCallableCode) { | ||
| this.pythonCallableCode = pythonCallableCode; | ||
| } | ||
|
|
||
| public static PythonCallableSource of(String pythonCallableCode) { | ||
| // TODO(BEAM-14457): check syntactic correctness of Python code if possible | ||
| return new PythonCallableSource(pythonCallableCode); | ||
| } | ||
|
|
||
| public String getPythonCallableCode() { | ||
| return pythonCallableCode; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| package org.apache.beam.sdk.extensions.python; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.SortedMap; | ||
|
|
@@ -33,10 +34,12 @@ | |
| import org.apache.beam.sdk.schemas.Schema; | ||
| import org.apache.beam.sdk.schemas.SchemaRegistry; | ||
| import org.apache.beam.sdk.schemas.SchemaTranslation; | ||
| import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable; | ||
| import org.apache.beam.sdk.schemas.utils.StaticSchemaInference; | ||
| import org.apache.beam.sdk.transforms.PTransform; | ||
| import org.apache.beam.sdk.transforms.SerializableFunction; | ||
| import org.apache.beam.sdk.util.CoderUtils; | ||
| import org.apache.beam.sdk.util.PythonCallableSource; | ||
| import org.apache.beam.sdk.values.PBegin; | ||
| import org.apache.beam.sdk.values.PCollection; | ||
| import org.apache.beam.sdk.values.PCollectionTuple; | ||
|
|
@@ -64,6 +67,7 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut | |
| // We preseve the order here since Schema's care about order of fields but the order will not | ||
| // matter when applying kwargs at the Python side. | ||
| private SortedMap<String, Object> kwargsMap; | ||
| private Map<java.lang.Class<?>, Schema.FieldType> typeHints; | ||
|
|
||
| private @Nullable Object @NonNull [] argsArray; | ||
| private @Nullable Row providedKwargsRow; | ||
|
|
@@ -72,6 +76,11 @@ private PythonExternalTransform(String fullyQualifiedName, String expansionServi | |
| this.fullyQualifiedName = fullyQualifiedName; | ||
| this.expansionService = expansionService; | ||
| this.kwargsMap = new TreeMap<>(); | ||
| this.typeHints = new HashMap<>(); | ||
| // TODO(BEAM-14458): remove a default type hint for PythonCallableSource when BEAM-14458 is | ||
| // resolved | ||
| this.typeHints.put( | ||
| PythonCallableSource.class, Schema.FieldType.logicalType(new PythonCallable())); | ||
| argsArray = new Object[] {}; | ||
| } | ||
|
|
||
|
|
@@ -162,6 +171,26 @@ public PythonExternalTransform<InputT, OutputT> withKwargs(Row kwargs) { | |
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Specifies the field type of arguments. | ||
| * | ||
| * <p>Type hints are especially useful for logical types since type inference does not work well | ||
| * for logical types. | ||
| * | ||
| * @param argType A class object for the argument type. | ||
| * @param fieldType A schema field type for the argument. | ||
| * @return updated wrapper for the cross-language transform. | ||
| */ | ||
| public PythonExternalTransform<InputT, OutputT> withTypeHint( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be per arg instead of per type ? In other words, can the same class map to different schema types ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Per type makes more sense to me. Do you have any specific per arg use-case in mind?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't. Just wasn't sure. We can keep this per type if Robert and Brian are OK. |
||
| java.lang.Class<?> argType, Schema.FieldType fieldType) { | ||
| if (typeHints.containsKey(argType)) { | ||
| throw new IllegalArgumentException( | ||
| String.format("typehint for arg type %s already exists", argType)); | ||
| } | ||
| typeHints.put(argType, fieldType); | ||
| return this; | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| Row buildOrGetKwargsRow() { | ||
| if (providedKwargsRow != null) { | ||
|
|
@@ -179,16 +208,18 @@ Row buildOrGetKwargsRow() { | |
| // Types that are not one of following are considered custom types. | ||
| // * Java primitives | ||
| // * Type String | ||
| // * Any Type explicitly annotated by withTypeHint() | ||
| // * Type Row | ||
| private static boolean isCustomType(java.lang.Class<?> type) { | ||
| private boolean isCustomType(java.lang.Class<?> type) { | ||
| boolean val = | ||
| !(ClassUtils.isPrimitiveOrWrapper(type) | ||
| || type == String.class | ||
| || typeHints.containsKey(type) | ||
| || Row.class.isAssignableFrom(type)); | ||
| return val; | ||
| } | ||
|
|
||
| // If the custom type has a registered schema, we use that. OTherwise we try to register it using | ||
| // If the custom type has a registered schema, we use that. Otherwise, we try to register it using | ||
| // 'JavaFieldSchema'. | ||
| private Row convertCustomValue(Object value) { | ||
| SerializableFunction<Object, Row> toRowFunc; | ||
|
|
@@ -239,6 +270,8 @@ private Schema generateSchemaDirectly( | |
| if (field instanceof Row) { | ||
| // Rows are used as is but other types are converted to proper field types. | ||
| builder.addRowField(fieldName, ((Row) field).getSchema()); | ||
| } else if (typeHints.containsKey(field.getClass())) { | ||
| builder.addField(fieldName, typeHints.get(field.getClass())); | ||
| } else { | ||
| builder.addField( | ||
| fieldName, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.