Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion core/src/main/java/org/apache/iceberg/SchemaUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,14 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
Expand All @@ -39,6 +44,7 @@
* Schema evolution API implementation.
*/
class SchemaUpdate implements UpdateSchema {
private static final Logger LOG = LoggerFactory.getLogger(SchemaUpdate.class);
private static final int TABLE_ROOT_ID = -1;

private final TableOperations ops;
Expand Down Expand Up @@ -200,7 +206,7 @@ public Schema apply() {

@Override
public void commit() {
TableMetadata update = base.updateSchema(apply(), lastColumnId);
TableMetadata update = applyChangesToMapping(base.updateSchema(apply(), lastColumnId));
ops.commit(base, update);
}

Expand All @@ -210,6 +216,30 @@ private int assignNewColumnId() {
return next;
}

private TableMetadata applyChangesToMapping(TableMetadata metadata) {
String mappingJson = metadata.property(TableProperties.DEFAULT_NAME_MAPPING, null);
if (mappingJson != null) {
try {
// parse and update the mapping
NameMapping mapping = NameMappingParser.fromJson(mappingJson);
NameMapping updated = MappingUtil.update(mapping, updates, adds);

// replace the table property
Map<String, String> updatedProperties = Maps.newHashMap();
updatedProperties.putAll(metadata.properties());
updatedProperties.put(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(updated));

return metadata.replaceProperties(updatedProperties);

} catch (RuntimeException e) {
// log the error, but do not fail the update
LOG.warn("Failed to update external schema mapping: {}", mappingJson, e);
}
}

return metadata;
}

private static Schema applyChanges(Schema schema, List<Integer> deletes,
Map<Integer, Types.NestedField> updates,
Multimap<Integer, Types.NestedField> adds) {
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private TableProperties() {}

// This only applies to files written after this property is set. Files previously written aren't
// relocated to reflect this parameter.
// If not set, defaults to a "meatdata" folder underneath the root path of the table.
// If not set, defaults to a "metadata" folder underneath the root path of the table.
public static final String WRITE_METADATA_LOCATION = "write.metadata.path";

public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
Expand All @@ -91,4 +91,6 @@ private TableProperties() {}

public static final String DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default";
public static final String DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)";

public static final String DEFAULT_NAME_MAPPING = "schema.name-mapping.default";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're calling this default, but it's not really a default, right? Shouldn't this just be schema.name-mapping?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that there may eventually be multiple mappings. If you have two streams of data written by Kafka, for example, you may want a different mapping from source name to Iceberg columns.

The current name defines a default mapping, to be used when a mapping is needed but there is no specified mapping for a file. Later, we can add schema.name-mapping.(name) properties to add more than one.

}
96 changes: 96 additions & 0 deletions core/src/main/java/org/apache/iceberg/mapping/MappedField.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mapping;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import java.util.Objects;
import java.util.Set;

/**
* An immutable mapping between a field ID and a set of names.
*/
public class MappedField {

static MappedField of(Integer id, String name) {
return new MappedField(id, ImmutableSet.of(name), null);
}

static MappedField of(Integer id, Iterable<String> names) {
return new MappedField(id, names, null);
}

static MappedField of(Integer id, String name, MappedFields nestedMapping) {
return new MappedField(id, ImmutableSet.of(name), nestedMapping);
}

static MappedField of(Integer id, Iterable<String> names, MappedFields nestedMapping) {
return new MappedField(id, names, nestedMapping);
}

private final Set<String> names;
private Integer id;
private MappedFields nestedMapping;

private MappedField(Integer id, Iterable<String> names, MappedFields nested) {
this.id = id;
this.names = ImmutableSet.copyOf(names);
this.nestedMapping = nested;
}

public Integer id() {
return id;
}

public Set<String> names() {
return names;
}

public MappedFields nestedMapping() {
return nestedMapping;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

MappedField that = (MappedField) other;
return names.equals(that.names) &&
Objects.equals(id, that.id) &&
Objects.equals(nestedMapping, that.nestedMapping);
}

@Override
public int hashCode() {
return Objects.hash(names, id, nestedMapping);
}

@Override
public String toString() {
return "([" + Joiner.on(", ").join(names) + "] -> " + (id != null ? id : "?") +
(nestedMapping != null ? ", " + nestedMapping + ")" : ")");
}
}
110 changes: 110 additions & 0 deletions core/src/main/java/org/apache/iceberg/mapping/MappedFields.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mapping;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class MappedFields {

static MappedFields of(MappedField... fields) {
return new MappedFields(ImmutableList.copyOf(fields));
}

static MappedFields of(List<MappedField> fields) {
return new MappedFields(fields);
}

private final List<MappedField> fields;
private final Map<String, Integer> nameToId;
private final Map<Integer, MappedField> idToField;

private MappedFields(List<MappedField> fields) {
this.fields = ImmutableList.copyOf(fields);
this.nameToId = indexIds(fields);
this.idToField = indexFields(fields);
}

public MappedField field(int id) {
return idToField.get(id);
}

public Integer id(String name) {
return nameToId.get(name);
}

public int size() {
return fields.size();
}

private static Map<String, Integer> indexIds(List<MappedField> fields) {
ImmutableMap.Builder<String, Integer> builder = ImmutableMap.builder();
fields.forEach(field ->
field.names().forEach(name -> {
Integer id = field.id();
if (id != null) {
builder.put(name, id);
}
}));
return builder.build();
}

private static Map<Integer, MappedField> indexFields(List<MappedField> fields) {
ImmutableMap.Builder<Integer, MappedField> builder = ImmutableMap.builder();
fields.forEach(field -> {
Integer id = field.id();
if (id != null) {
builder.put(id, field);
}
});
return builder.build();
}

public List<MappedField> fields() {
return fields;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

return fields.equals(((MappedFields) other).fields);
}

@Override
public int hashCode() {
return Objects.hash(fields);
}

@Override
public String toString() {
return "[ " + Joiner.on(", ").join(fields) + " ]";
}
}
Loading