Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,28 @@ std::string BrokerTableDescriptor::debug_string() const {
return out.str();
}

HiveTableDescriptor::HiveTableDescriptor(const TTableDescriptor& tdesc)
: TableDescriptor(tdesc) {}

HiveTableDescriptor::~HiveTableDescriptor() {}

std::string HiveTableDescriptor::debug_string() const {
std::stringstream out;
out << "HiveTable(" << TableDescriptor::debug_string() << ")";
return out.str();
}

IcebergTableDescriptor::IcebergTableDescriptor(const TTableDescriptor& tdesc)
: TableDescriptor(tdesc) {}

IcebergTableDescriptor::~IcebergTableDescriptor() {}

std::string IcebergTableDescriptor::debug_string() const {
std::stringstream out;
out << "IcebergTable(" << TableDescriptor::debug_string() << ")";
return out.str();
}

EsTableDescriptor::EsTableDescriptor(const TTableDescriptor& tdesc) : TableDescriptor(tdesc) {}

EsTableDescriptor::~EsTableDescriptor() {}
Expand Down Expand Up @@ -524,6 +546,12 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb
case TTableType::ES_TABLE:
desc = pool->add(new EsTableDescriptor(tdesc));
break;
case TTableType::HIVE_TABLE:
desc = pool->add(new HiveTableDescriptor(tdesc));
break;
case TTableType::ICEBERG_TABLE:
desc = pool->add(new IcebergTableDescriptor(tdesc));
break;
default:
DCHECK(false) << "invalid table type: " << tdesc.tableType;
}
Expand Down
18 changes: 18 additions & 0 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,24 @@ class BrokerTableDescriptor : public TableDescriptor {
private:
};

class HiveTableDescriptor : public TableDescriptor {
public:
HiveTableDescriptor(const TTableDescriptor& tdesc);
virtual ~HiveTableDescriptor();
virtual std::string debug_string() const;

private:
};

class IcebergTableDescriptor : public TableDescriptor {
public:
IcebergTableDescriptor(const TTableDescriptor& tdesc);
virtual ~IcebergTableDescriptor();
virtual std::string debug_string() const;

private:
};

class EsTableDescriptor : public TableDescriptor {
public:
EsTableDescriptor(const TTableDescriptor& tdesc);
Expand Down
8 changes: 8 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,14 @@ under the License.
<artifactId>iceberg-hive-metastore</artifactId>
<scope>provided</scope>
</dependency>

<!-- For Iceberg, must be consistent with Iceberg version -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<scope>provided</scope>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add it to fe/pom.xml and reference it here.


</dependencies>
<build>
<finalName>palo-fe</finalName>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void readFields(DataInput in) throws IOException {
@Override
public TTableDescriptor toThrift() {
THiveTable tHiveTable = new THiveTable(getHiveDb(), getHiveTable(), getHiveProperties());
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE,
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE,
fullSchema.size(), 0, getName(), "");
tTableDescriptor.setHiveTable(tHiveTable);
return tTableDescriptor;
Expand Down
159 changes: 143 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,38 @@

package org.apache.doris.catalog;


import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.external.iceberg.IcebergCatalog;
import org.apache.doris.external.iceberg.IcebergCatalogMgr;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TIcebergTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import com.google.common.collect.Maps;

import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* External Iceberg table
Expand All @@ -44,10 +60,23 @@ public class IcebergTable extends Table {
private String icebergDb;
// remote Iceberg table name
private String icebergTbl;
// remote Iceberg table location
private String location;
// Iceberg table file format
private String fileFormat;
// Iceberg storage type
private StorageBackend.StorageType storageType;
// Iceberg remote host uri
private String hostUri;
// location analyze flag
private boolean isAnalyzed = false;
private Map<String, String> icebergProperties = Maps.newHashMap();

private org.apache.iceberg.Table icebergTable;

private final byte[] loadLock = new byte[0];
private final AtomicBoolean isLoaded = new AtomicBoolean(false);

public IcebergTable() {
super(TableType.ICEBERG);
}
Expand All @@ -73,28 +102,126 @@ public String getIcebergDb() {
return icebergDb;
}

public void setIcebergDb(String icebergDb) {
this.icebergDb = icebergDb;
}

public String getIcebergTbl() {
return icebergTbl;
}

public void setIcebergTbl(String icebergTbl) {
this.icebergTbl = icebergTbl;
}

public Map<String, String> getIcebergProperties() {
return icebergProperties;
}

public void setIcebergProperties(Map<String, String> icebergProperties) {
this.icebergProperties = icebergProperties;
private void getLocation() throws UserException {
if (Strings.isNullOrEmpty(location)) {
try {
getTable();
} catch (Exception e) {
throw new UserException("Failed to get table: " + name + ",error: " + e.getMessage());
}
location = icebergTable.location();
}
analyzeLocation();
}

private void analyzeLocation() throws UserException {
if (isAnalyzed) {
return;
}
String[] strings = StringUtils.split(location, "/");

// analyze storage type
String storagePrefix = strings[0].split(":")[0];
if (storagePrefix.equalsIgnoreCase("s3")) {
this.storageType = StorageBackend.StorageType.S3;
} else if (storagePrefix.equalsIgnoreCase("hdfs")) {
this.storageType = StorageBackend.StorageType.HDFS;
} else {
throw new UserException("Not supported storage type: " + storagePrefix);
}

// analyze host uri
// eg: hdfs://host:port
// s3://host:port
String host = strings[1];
this.hostUri = storagePrefix + "://" + host;
this.isAnalyzed = true;
}

public String getHostUri() throws UserException {
if (!isAnalyzed) {
getLocation();
}
return hostUri;
}

public StorageBackend.StorageType getStorageType() throws UserException {
if (!isAnalyzed) {
getLocation();
}
return storageType;
}

public String getFileFormat() throws UserException {
if (Strings.isNullOrEmpty(fileFormat)) {
try {
getTable();
} catch (Exception e) {
throw new UserException("Failed to get table: " + name + ",error: " + e.getMessage());
}
fileFormat = icebergTable.properties().get(TableProperties.DEFAULT_FILE_FORMAT);
}
return fileFormat;
}

// get the iceberg table instance, if table is not loaded, load it.
private org.apache.iceberg.Table getTable() throws Exception {
if (isLoaded.get()) {
Preconditions.checkNotNull(icebergTable);
return icebergTable;
}
synchronized (loadLock) {
if (icebergTable != null) {
return icebergTable;
}

IcebergProperty icebergProperty = getIcebergProperty();
IcebergCatalog icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty);
try {
this.icebergTable = icebergCatalog.loadTable(TableIdentifier.of(icebergDb, icebergTbl));
LOG.info("finished to load iceberg table: {}", name);
} catch (Exception e) {
LOG.warn("failed to load iceberg table {} from {}", name, icebergProperty.getHiveMetastoreUris(), e);
throw e;
}

isLoaded.set(true);
return icebergTable;
}
}

public org.apache.iceberg.Table getIcebergTable() {
return icebergTable;
private IcebergProperty getIcebergProperty() {
Map<String, String> properties = Maps.newHashMap(icebergProperties);
properties.put(IcebergProperty.ICEBERG_DATABASE, icebergDb);
properties.put(IcebergProperty.ICEBERG_TABLE, icebergTbl);
return new IcebergProperty(properties);
}

/**
* Get iceberg data file by file system table location and iceberg predicates
* @throws Exception
*/
public List<TBrokerFileStatus> getIcebergDataFiles(List<Expression> predicates) throws Exception {
org.apache.iceberg.Table table = getTable();
TableScan scan = table.newScan();
for (Expression predicate : predicates) {
scan = scan.filter(predicate);
}
List<TBrokerFileStatus> relatedFiles = Lists.newArrayList();
for (FileScanTask task : scan.planFiles()) {
Path path = Paths.get(task.file().path().toString());
String relativePath = "/" + path.subpath(2, path.getNameCount());
relatedFiles.add(new TBrokerFileStatus(relativePath, false, task.file().fileSizeInBytes(), false));
}
return relatedFiles;
}

@Override
Expand Down Expand Up @@ -128,7 +255,7 @@ public void readFields(DataInput in) throws IOException {
@Override
public TTableDescriptor toThrift() {
TIcebergTable tIcebergTable = new TIcebergTable(getIcebergDb(), getIcebergTbl(), getIcebergProperties());
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE,
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_TABLE,
fullSchema.size(), 0, getName(), "");
tTableDescriptor.setIcebergTable(tIcebergTable);
return tTableDescriptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.HiveTable;
import org.apache.doris.catalog.IcebergTable;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
Expand All @@ -37,6 +38,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.load.loadv2.LoadTask;
Expand Down Expand Up @@ -137,6 +139,15 @@ public BrokerFileGroup(HiveTable table,
this.columnExprList = columnExprList;
}

// Used for iceberg table, no need to parse
public BrokerFileGroup(IcebergTable table) throws UserException {
this.tableId = table.getId();
this.isNegative = false;
this.valueSeparator = "|";
this.lineDelimiter = "\n";
this.fileFormat = table.getFileFormat();
}

public BrokerFileGroup(DataDescription dataDescription) {
this.fileFieldNames = dataDescription.getFileFieldNames();
this.columnsFromPath = dataDescription.getColumnsFromPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

Expand All @@ -63,7 +64,6 @@
import java.util.Map.Entry;
import java.util.Set;

import avro.shaded.com.google.common.collect.Lists;

public class LoadChecker extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(LoadChecker.class);
Expand Down
Loading