Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
<module>tajo-metrics</module>
<module>tajo-core-tests</module>
<module>tajo-cluster-tests</module>
<module>tajo-tablespace-example</module>
<module>tajo-dist</module>
</modules>

Expand Down Expand Up @@ -144,7 +145,7 @@
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<version>2.4.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.catalog.proto.CatalogProtos.PartitionKeyProto;
import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
import org.apache.tajo.catalog.proto.CatalogProtos.TableIdentifierProto;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.conf.TajoConf;
Expand All @@ -50,44 +53,14 @@

public class CatalogUtil {


public static String getBackwardCompitableDataFormat(String dataFormat) {
return getDataFormatAsString(asDataFormat(dataFormat));
}

public static String getDataFormatAsString(final DataFormat type) {
if (type == DataFormat.TEXTFILE) {
return BuiltinStorages.TEXT;
} else {
return type.name();
}
}

public static DataFormat asDataFormat(final String typeStr) {
if (typeStr.equalsIgnoreCase("CSV")) {
return DataFormat.TEXTFILE;
} else if (typeStr.equalsIgnoreCase(DataFormat.RAW.name())) {
return CatalogProtos.DataFormat.RAW;
} else if (typeStr.equalsIgnoreCase(CatalogProtos.DataFormat.ROWFILE.name())) {
return DataFormat.ROWFILE;
} else if (typeStr.equalsIgnoreCase(DataFormat.RCFILE.name())) {
return DataFormat.RCFILE;
} else if (typeStr.equalsIgnoreCase(CatalogProtos.DataFormat.ORC.name())) {
return CatalogProtos.DataFormat.ORC;
} else if (typeStr.equalsIgnoreCase(DataFormat.PARQUET.name())) {
return DataFormat.PARQUET;
} else if (typeStr.equalsIgnoreCase(DataFormat.SEQUENCEFILE.name())) {
return DataFormat.SEQUENCEFILE;
} else if (typeStr.equalsIgnoreCase(DataFormat.AVRO.name())) {
return CatalogProtos.DataFormat.AVRO;
} else if (typeStr.equalsIgnoreCase(BuiltinStorages.TEXT)) {
return CatalogProtos.DataFormat.TEXTFILE;
} else if (typeStr.equalsIgnoreCase(CatalogProtos.DataFormat.JSON.name())) {
return CatalogProtos.DataFormat.JSON;
} else if (typeStr.equalsIgnoreCase(CatalogProtos.DataFormat.HBASE.name())) {
return CatalogProtos.DataFormat.HBASE;
} else {
return null;
final String upperDataFormat = dataFormat.toUpperCase();
switch (upperDataFormat) {
case "CSV":
case "TEXTFILE":
return BuiltinStorages.TEXT;
default:
return dataFormat;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,6 @@ option java_generate_equals_and_hash = true;
import "DataTypes.proto";
import "PrimitiveProtos.proto";

enum DataFormat {
MEM = 0;
TEXTFILE = 1;
RAW = 2;
RCFILE = 3;
ROWFILE = 4;
HCFILE = 5;
ORC = 6;
PARQUET = 7;
SEQUENCEFILE = 8;
AVRO = 9;
JSON = 10;
HBASE = 11;
SYSTEM = 12;
}

enum OrderType {
ORDER_NONE = 0;
ASC = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import java.util.Collections;
import java.util.List;

import org.apache.tajo.BuiltinStorages;
import org.apache.tajo.exception.UndefinedTableException;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.DataFormat;

public class InfoSchemaMetadataDictionary {
private static final String DATABASE_NAME = "information_schema";
Expand Down Expand Up @@ -129,6 +129,6 @@ public boolean existTable(String tableName) {
}

protected String getTablePath() {
return DataFormat.SYSTEM.name().toUpperCase();
return BuiltinStorages.SYSTEM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,11 @@ public CatalogServer getMiniCatalogCluster() {
}

public CatalogService getCatalogService() {
return new LocalCatalogWrapper(catalogServer);
if (catalogServer != null) {
return new LocalCatalogWrapper(catalogServer);
} else {
return tajoMaster.getCatalog();
}
}

public boolean isHiveCatalogStoreRunning() {
Expand Down
12 changes: 12 additions & 0 deletions tajo-cluster-tests/src/test/java/org/apache/tajo/TpchTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.tajo;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -103,6 +104,17 @@ public TajoTestingCluster getTestingCluster() {
return util.getTestingCluster();
}

public ImmutableList<String> getNames() {
return ImmutableList.copyOf(names);
}

public Schema getSchema(String tableName) {
if (!nameMap.containsKey(tableName)) {
throw new RuntimeException("No such a table name '" + tableName + "'");
}
return schemas[nameMap.get(tableName)];
}

public String getPath(String tableName) {
if (!nameMap.containsKey(tableName)) {
throw new RuntimeException("No such a table name '" + tableName + "'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ public class BuiltinStorages {
public static final String SEQUENCE_FILE = "SEQUENCEFILE";
public static final String AVRO = "AVRO";
public static final String HBASE = "HBASE";
public static final String SYSTEM = "SYSTEM";
public static final String EX_HTTP_JSON = "EX_HTTP_JSON";
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class ErrorMessages {
ADD_MESSAGE(UNSUPPORTED_DATATYPE, "unsupported data type: '%s'", 1);
ADD_MESSAGE(INVALID_TABLE_PROPERTY, "invalid table property '%s': '%s'", 2);
ADD_MESSAGE(MISSING_TABLE_PROPERTY, "table property '%s' required for '%s'", 2);
ADD_MESSAGE(INVALID_TABLESPACE_URI, "Invalid tablespace '%s' for table '%s'", 2);

ADD_MESSAGE(AMBIGUOUS_PARTITION_DIRECTORY, "There is a directory which is assumed to be a partitioned directory" +
" : '%s'", 1);
Expand Down
1 change: 1 addition & 0 deletions tajo-common/src/main/proto/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ enum ResultCode {
UNSUPPORTED_DATATYPE = 1003; // SQLState: ? - Unsupported data type
INVALID_TABLE_PROPERTY = 1004; // SQLState: ? - Invalid Table Property
MISSING_TABLE_PROPERTY = 1005; // SQLState: ? - Missing table property
INVALID_TABLESPACE_URI = 1006;

// Client Connection
CLIENT_CONNECTION_EXCEPTION = 1101; // SQLState: 08000 - Client connection error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.tajo.QueryTestCaseBase;
import org.apache.tajo.auth.UserRoleInfo;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.FileUtil;
Expand Down Expand Up @@ -98,10 +99,11 @@ public void testDump4() throws Exception {
TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter);
printWriter.flush();
printWriter.close();
TableMeta meta = client.getTableDesc(getCurrentDatabase() + ".TableName1").getMeta();

assertOutputResult("testDump3.result", new String(bos.toByteArray()),
new String[]{"${index.path}", "${table.timezone}"},
new String[]{TablespaceManager.getDefault().getTableUri(getCurrentDatabase(), "test_idx").toString(),
new String[]{TablespaceManager.getDefault().getTableUri(meta, getCurrentDatabase(), "test_idx").toString(),
testingCluster.getConfiguration().getSystemTimezone().getID()});
bos.close();
} finally {
Expand Down Expand Up @@ -137,10 +139,12 @@ public void testPartitionsDump() throws Exception {
printWriter.flush();
printWriter.close();

TableMeta meta = client.getTableDesc(getCurrentDatabase() + ".TableName3").getMeta();

assertOutputResult("testPartitionsDump.result", new String(bos.toByteArray()),
new String[]{"${partition.path1}", "${partition.path2}", "${table.timezone}"},
new String[]{TablespaceManager.getDefault().getTableUri(getCurrentDatabase(), "TableName3").toString(),
TablespaceManager.getDefault().getTableUri(getCurrentDatabase(), "TableName4").toString(),
new String[]{TablespaceManager.getDefault().getTableUri(meta, getCurrentDatabase(), "TableName3").toString(),
TablespaceManager.getDefault().getTableUri(meta, getCurrentDatabase(), "TableName4").toString(),
testingCluster.getConfiguration().getSystemTimezone().getID()});

bos.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.TpchTestBase;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.cli.tsql.commands.TajoShellCommand;
import org.apache.tajo.client.ClientParameters;
import org.apache.tajo.client.QueryStatus;
Expand Down Expand Up @@ -252,9 +253,10 @@ private void verifyDescTable(String sql, String tableName, String resultFileName
String consoleResult = new String(out.toByteArray());

if (!cluster.isHiveCatalogStoreRunning()) {
TableMeta meta = cluster.getCatalogService().getTableDesc("default", tableName).getMeta();
assertOutputResult(resultFileName, consoleResult, new String[]{"${table.timezone}", "${table.path}"},
new String[]{cluster.getConfiguration().getSystemTimezone().getID(),
TablespaceManager.getDefault().getTableUri("default", tableName).toString()});
TablespaceManager.getDefault().getTableUri(meta, "default", tableName).toString()});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public TableDesc create(QueryContext queryContext,
Tablespace tableSpace = getTablespaceHandler(tableSpaceName, uri);

TableDesc desc;
URI tableUri = isExternal ? uri : tableSpace.getTableUri(databaseName, simpleTableName);
URI tableUri = isExternal ? uri : tableSpace.getTableUri(meta, databaseName, simpleTableName);
desc = new TableDesc(qualifiedName, schema, meta, tableUri, isExternal);

if (partitionDesc != null) {
Expand Down
1 change: 1 addition & 0 deletions tajo-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
run cp -r $ROOT/tajo-sql-parser/target/tajo-sql-parser-${project.version}/* .
run cp -r $ROOT/tajo-storage/tajo-storage-jdbc/target/tajo-storage-jdbc-${project.version}.jar .
run cp -r $ROOT/tajo-storage/tajo-storage-pgsql/target/tajo-storage-pgsql-${project.version}.jar .
run cp -r $ROOT/tajo-tablespace-example/target/tajo-tablespace-example-${project.version}.jar .
run cp -r $ROOT/tajo-pullserver/target/tajo-pullserver-${project.version}.jar .
run cp -r $ROOT/tajo-metrics/target/tajo-metrics-${project.version}.jar .
run cp -r $ROOT/tajo-core/target/tajo-core-${project.version}.jar .
Expand Down
1 change: 0 additions & 1 deletion tajo-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static void setup() throws Exception {
rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(scoreSchema));
TableStats stats = new TableStats();

Path p = new Path(sm.getTableUri("default", "score"));
Path p = new Path(sm.getTableUri(scoreMeta, "default", "score"));
sm.getFileSystem().mkdirs(p);
Appender appender = sm.getAppender(scoreMeta, scoreSchema, new Path(p, "score"));
appender.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2009,7 +2009,10 @@ private URI getCreatedTableURI(PlanContext context, CreateTable createTable) {
IdentifierUtil.extractQualifier(tableName) : context.queryContext.get(SessionVars.CURRENT_DATABASE);

return storage.getTableURI(
createTable.getTableSpaceName(), databaseName, IdentifierUtil.extractSimpleName(tableName));
createTable.getTableSpaceName(),
new TableMeta(createTable.getStorageType(), new KeyValueSet(createTable.getParams())),
databaseName,
IdentifierUtil.extractSimpleName(tableName));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tajo.plan;

import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.expr.EvalNode;

Expand All @@ -39,7 +40,7 @@ public interface StorageService {
* @param tableName Table name
* @return Table URI
*/
URI getTableURI(@Nullable String spaceName, String databaseName, String tableName);
URI getTableURI(@Nullable String spaceName, TableMeta meta, String databaseName, String tableName);

long getTableVolumn(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ public LogicalNode visitLimit(LogicalPlanner.PlanContext ctx, Stack<Expr> stack,
public LogicalNode visitSort(LogicalPlanner.PlanContext ctx, Stack<Expr> stack, Sort expr) throws TajoException {
stack.push(expr);
LogicalNode child = visit(ctx, stack, expr.getChild());

stack.pop();

SortNode sortNode = ctx.getPlan().createNode(SortNode.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,16 @@ public LogicalNode visitLimit(ProcessorContext ctx, Stack<Expr> stack, Limit exp

@Override
public LogicalNode visitSort(ProcessorContext ctx, Stack<Expr> stack, Sort expr) throws TajoException {
for (Sort.SortSpec sortSpec : expr.getSortSpecs()) {
Set<ColumnReferenceExpr> columns = ExprFinder.finds(sortSpec.getKey(), OpType.Column);
for (ColumnReferenceExpr col : columns) {
if (!ctx.aliasSet.contains(col.getName())) {
NameRefInSelectListNormalizer.normalize(ctx.planContext, col);
TUtil.putToNestedList(ctx.projectColumns, col.getQualifier(), col);
}
}
}

super.visitSort(ctx, stack, expr);

SortNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ public Expr visitCreateTable(Context context, Stack<Expr> stack, CreateTable exp
if (expr.getStorageType() != null) {
if (expr.hasSelfDescSchema()) {
// TODO: support other types like Parquet and ORC.
if (!expr.getStorageType().equalsIgnoreCase(BuiltinStorages.JSON)) {
if (!expr.getStorageType().equalsIgnoreCase(BuiltinStorages.JSON) &&
!expr.getStorageType().equalsIgnoreCase(BuiltinStorages.EX_HTTP_JSON)) {
if (expr.getStorageType().equalsIgnoreCase(BuiltinStorages.PARQUET) ||
expr.getStorageType().equalsIgnoreCase(BuiltinStorages.ORC)) {
throw new NotImplementedException(expr.getStorageType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public String toString() {
return name + "=" + uri.toString();
}

public abstract long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException;
public abstract long getTableVolume(TableDesc table, Optional<EvalNode> filter);

/**
* if {@link StorageProperty#isArbitraryPathAllowed} is true,
Expand All @@ -115,11 +115,12 @@ public URI getRootUri() {
/**
* Get Table URI
*
* @param meta table meta
* @param databaseName Database name
* @param tableName Table name
* @return Table URI
*/
public abstract URI getTableUri(String databaseName, String tableName);
public abstract URI getTableUri(TableMeta meta, String databaseName, String tableName);

/**
* Returns the splits that will serve as input for the scan tasks. The
Expand Down Expand Up @@ -280,13 +281,12 @@ public Appender getAppender(OverridableConf queryContext,
if (appenderClass == null) {
appenderClass = conf.getClass(
String.format("tajo.storage.appender-handler.%s.class", handlerName), null, Appender.class);
if (appenderClass == null) {
throw new IOException("Undefined appender handler for " + meta.getDataFormat());
}
OldStorageManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
}

if (appenderClass == null) {
throw new IOException("Unknown Storage Type: " + meta.getDataFormat());
}

appender = OldStorageManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);

return appender;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.MetadataProvider;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.exception.UndefinedTablespaceException;
Expand Down Expand Up @@ -430,9 +431,9 @@ public static Optional<? extends Tablespace> getAnyByScheme(String scheme) {
}

@Override
public URI getTableURI(@Nullable String spaceName, String databaseName, String tableName) {
public URI getTableURI(@Nullable String spaceName, TableMeta meta, String databaseName, String tableName) {
Tablespace space = spaceName == null ? getDefault() : getByName(spaceName);
return space.getTableUri(databaseName, tableName);
return space.getTableUri(meta, databaseName, tableName);
}

@Override
Expand Down
Loading