-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Implement Catalogs.createTable and Catalogs.dropTable #1481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
514575e
a1207f1
e21ad74
3df7efd
fc28ebf
1960cde
66bb2e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import org.apache.iceberg.exceptions.RuntimeIOException; | ||
| import org.apache.iceberg.io.FileIO; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Joiner; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Iterables; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.MapMaker; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
| import org.apache.iceberg.util.Tasks; | ||
| import org.apache.iceberg.util.ThreadPools; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class CatalogUtil { | ||
| private static final Logger LOG = LoggerFactory.getLogger(CatalogUtil.class); | ||
|
|
||
| private CatalogUtil() { | ||
| } | ||
|
|
||
| /** | ||
| * Drops all data and metadata files referenced by TableMetadata. | ||
| * <p> | ||
| * This should be called by dropTable implementations to clean up table files once the table has been dropped in the | ||
| * metastore. | ||
| * | ||
| * @param io a FileIO to use for deletes | ||
| * @param metadata the last valid TableMetadata instance for a dropped table. | ||
| */ | ||
| public static void dropTableData(FileIO io, TableMetadata metadata) { | ||
| // Reads and deletes are done using Tasks.foreach(...).suppressFailureWhenFinished to complete | ||
| // as much of the delete work as possible and avoid orphaned data or manifest files. | ||
|
|
||
| Set<String> manifestListsToDelete = Sets.newHashSet(); | ||
| Set<ManifestFile> manifestsToDelete = Sets.newHashSet(); | ||
| for (Snapshot snapshot : metadata.snapshots()) { | ||
| // add all manifests to the delete set because both data and delete files should be removed | ||
| Iterables.addAll(manifestsToDelete, snapshot.allManifests()); | ||
| // add the manifest list to the delete set, if present | ||
| if (snapshot.manifestListLocation() != null) { | ||
| manifestListsToDelete.add(snapshot.manifestListLocation()); | ||
| } | ||
| } | ||
|
|
||
| LOG.info("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete)); | ||
|
|
||
| // run all of the deletes | ||
|
|
||
| deleteFiles(io, manifestsToDelete); | ||
|
|
||
| Tasks.foreach(Iterables.transform(manifestsToDelete, ManifestFile::path)) | ||
| .noRetry().suppressFailureWhenFinished() | ||
| .onFailure((manifest, exc) -> LOG.warn("Delete failed for manifest: {}", manifest, exc)) | ||
| .run(io::deleteFile); | ||
|
|
||
| Tasks.foreach(manifestListsToDelete) | ||
| .noRetry().suppressFailureWhenFinished() | ||
| .onFailure((list, exc) -> LOG.warn("Delete failed for manifest list: {}", list, exc)) | ||
| .run(io::deleteFile); | ||
|
|
||
| Tasks.foreach(metadata.metadataFileLocation()) | ||
| .noRetry().suppressFailureWhenFinished() | ||
| .onFailure((list, exc) -> LOG.warn("Delete failed for metadata file: {}", list, exc)) | ||
| .run(io::deleteFile); | ||
| } | ||
|
|
||
| private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) { | ||
| // keep track of deleted files in a map that can be cleaned up when memory runs low | ||
| Map<String, Boolean> deletedFiles = new MapMaker() | ||
| .concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE) | ||
| .weakKeys() | ||
| .makeMap(); | ||
|
|
||
| Tasks.foreach(allManifests) | ||
| .noRetry().suppressFailureWhenFinished() | ||
| .executeWith(ThreadPools.getWorkerPool()) | ||
| .onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) | ||
| .run(manifest -> { | ||
| try (ManifestReader<?> reader = ManifestFiles.open(manifest, io)) { | ||
| for (ManifestEntry<?> entry : reader.entries()) { | ||
| // intern the file path because the weak key map uses identity (==) instead of equals | ||
| String path = entry.file().path().toString().intern(); | ||
| Boolean alreadyDeleted = deletedFiles.putIfAbsent(path, true); | ||
| if (alreadyDeleted == null || !alreadyDeleted) { | ||
| try { | ||
| io.deleteFile(path); | ||
| } catch (RuntimeException e) { | ||
| // this may happen if the map of deleted files gets cleaned up by gc | ||
| LOG.warn("Delete failed for data file: {}", path, e); | ||
| } | ||
| } | ||
| } | ||
| } catch (IOException e) { | ||
| throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest.path()); | ||
| } | ||
| }); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,11 +19,14 @@ | |
|
|
||
| package org.apache.iceberg.hadoop; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.util.Map; | ||
| import org.apache.hadoop.conf.Configurable; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.iceberg.BaseTable; | ||
| import org.apache.iceberg.CatalogUtil; | ||
| import org.apache.iceberg.MetadataTableType; | ||
| import org.apache.iceberg.MetadataTableUtils; | ||
| import org.apache.iceberg.PartitionSpec; | ||
|
|
@@ -144,6 +147,50 @@ public Table create(Schema schema, PartitionSpec spec, SortOrder order, | |
| return new BaseTable(ops, location); | ||
| } | ||
|
|
||
| /** | ||
| * Drop a table and delete all data and metadata files. | ||
| * | ||
| * @param location a path URI (e.g. hdfs:///warehouse/my_table) | ||
| * @return true if the table was dropped, false if it did not exist | ||
| */ | ||
| public boolean dropTable(String location) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this wasn't implemented before because it is not part of the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this would merit another discussion, and another PR
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. |
||
| return dropTable(location, true); | ||
| } | ||
|
|
||
| /** | ||
| * Drop a table; optionally delete data and metadata files. | ||
| * <p> | ||
| * If purge is set to true the implementation should delete all data and metadata files. | ||
| * | ||
| * @param location a path URI (e.g. hdfs:///warehouse/my_table) | ||
| * @param purge if true, delete all data and metadata files in the table | ||
| * @return true if the table was dropped, false if it did not exist | ||
| */ | ||
| public boolean dropTable(String location, boolean purge) { | ||
| TableOperations ops = newTableOps(location); | ||
| TableMetadata lastMetadata = null; | ||
| if (ops.current() != null) { | ||
| if (purge) { | ||
| lastMetadata = ops.current(); | ||
| } | ||
| } else { | ||
| return false; | ||
| } | ||
|
|
||
| try { | ||
| if (purge && lastMetadata != null) { | ||
| // Since the data files and the metadata files may store in different locations, | ||
| // so it has to call dropTableData to force delete the data file. | ||
| CatalogUtil.dropTableData(ops.io(), lastMetadata); | ||
| } | ||
| Path tablePath = new Path(location); | ||
| Util.getFs(tablePath, conf).delete(tablePath, true /* recursive */); | ||
rdblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return true; | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException("Failed to delete file: " + location, e); | ||
| } | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| TableOperations newTableOps(String location) { | ||
| if (location.contains(METADATA_JSON)) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.