diff --git a/pom.xml b/pom.xml index 71c062bbfa..32c33f423b 100644 --- a/pom.xml +++ b/pom.xml @@ -98,6 +98,7 @@ tajo-metrics tajo-core-tests tajo-cluster-tests + tajo-tablespace-example tajo-dist @@ -144,7 +145,7 @@ maven-assembly-plugin - 2.3 + 2.4.1 org.apache.maven.plugins diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index f15ce030a0..a125ce27bd 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -26,7 +26,10 @@ import org.apache.tajo.catalog.partition.PartitionDesc; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.proto.CatalogProtos.*; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionKeyProto; +import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableIdentifierProto; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.conf.TajoConf; @@ -50,44 +53,14 @@ public class CatalogUtil { - public static String getBackwardCompitableDataFormat(String dataFormat) { - return getDataFormatAsString(asDataFormat(dataFormat)); - } - - public static String getDataFormatAsString(final DataFormat type) { - if (type == DataFormat.TEXTFILE) { - return BuiltinStorages.TEXT; - } else { - return type.name(); - } - } - - public static DataFormat asDataFormat(final String typeStr) { - if (typeStr.equalsIgnoreCase("CSV")) { - return DataFormat.TEXTFILE; - } else if (typeStr.equalsIgnoreCase(DataFormat.RAW.name())) { - return CatalogProtos.DataFormat.RAW; - } else if (typeStr.equalsIgnoreCase(CatalogProtos.DataFormat.ROWFILE.name())) { - return DataFormat.ROWFILE; - } else if (typeStr.equalsIgnoreCase(DataFormat.RCFILE.name())) { - return DataFormat.RCFILE; - } else if (typeStr.equalsIgnoreCase(CatalogProtos.DataFormat.ORC.name())) { - return CatalogProtos.DataFormat.ORC; - } else if (typeStr.equalsIgnoreCase(DataFormat.PARQUET.name())) { - return DataFormat.PARQUET; - } else if (typeStr.equalsIgnoreCase(DataFormat.SEQUENCEFILE.name())) { - return DataFormat.SEQUENCEFILE; - } else if (typeStr.equalsIgnoreCase(DataFormat.AVRO.name())) { - return CatalogProtos.DataFormat.AVRO; - } else if (typeStr.equalsIgnoreCase(BuiltinStorages.TEXT)) { - return CatalogProtos.DataFormat.TEXTFILE; - } else if (typeStr.equalsIgnoreCase(CatalogProtos.DataFormat.JSON.name())) { - return CatalogProtos.DataFormat.JSON; - } else if (typeStr.equalsIgnoreCase(CatalogProtos.DataFormat.HBASE.name())) { - return CatalogProtos.DataFormat.HBASE; - } else { - return null; + final String upperDataFormat = dataFormat.toUpperCase(); + switch (upperDataFormat) { + case "CSV": + case "TEXTFILE": + return BuiltinStorages.TEXT; + default: + return dataFormat; } } diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index b42cf58a5f..ea5f933bd0 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -25,22 +25,6 @@ option java_generate_equals_and_hash = true; import "DataTypes.proto"; import "PrimitiveProtos.proto"; -enum DataFormat { - MEM = 0; - TEXTFILE = 1; - RAW = 2; - RCFILE = 3; - ROWFILE = 4; - HCFILE = 5; - ORC = 6; - PARQUET = 7; - SEQUENCEFILE = 8; - AVRO = 9; - JSON = 10; - HBASE = 11; - SYSTEM = 12; -} - enum OrderType { ORDER_NONE = 0; ASC = 1; diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java index d9008aa1b7..ea608ebfca 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java @@ -22,9 +22,9 @@ import java.util.Collections; import java.util.List; +import org.apache.tajo.BuiltinStorages; import org.apache.tajo.exception.UndefinedTableException; import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.proto.CatalogProtos.DataFormat; public class InfoSchemaMetadataDictionary { private static final String DATABASE_NAME = "information_schema"; @@ -129,6 +129,6 @@ public boolean existTable(String tableName) { } protected String getTablePath() { - return DataFormat.SYSTEM.name().toUpperCase(); + return BuiltinStorages.SYSTEM; } } diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java index 4b89c8e274..2126ceac9d 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -334,7 +334,11 @@ public CatalogServer getMiniCatalogCluster() { } public CatalogService getCatalogService() { - return new LocalCatalogWrapper(catalogServer); + if (catalogServer != null) { + return new LocalCatalogWrapper(catalogServer); + } else { + return tajoMaster.getCatalog(); + } } public boolean isHiveCatalogStoreRunning() { diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TpchTestBase.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TpchTestBase.java index 70d25b48e6..5e2f654264 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TpchTestBase.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TpchTestBase.java @@ -18,6 +18,7 @@ package org.apache.tajo; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -103,6 +104,17 @@ public TajoTestingCluster getTestingCluster() { return util.getTestingCluster(); } + public ImmutableList getNames() { + return ImmutableList.copyOf(names); + } + + public Schema getSchema(String tableName) { + if (!nameMap.containsKey(tableName)) { + throw new RuntimeException("No such a table name '" + tableName + "'"); + } + return schemas[nameMap.get(tableName)]; + } + public String getPath(String tableName) { if (!nameMap.containsKey(tableName)) { throw new RuntimeException("No such a table name '" + tableName + "'"); diff --git a/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java b/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java index 2c8f68617c..aa7a9e73f5 100644 --- a/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java +++ b/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java @@ -30,4 +30,6 @@ public class BuiltinStorages { public static final String SEQUENCE_FILE = "SEQUENCEFILE"; public static final String AVRO = "AVRO"; public static final String HBASE = "HBASE"; + public static final String SYSTEM = "SYSTEM"; + public static final String EX_HTTP_JSON = "EX_HTTP_JSON"; } diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java index a6222cf9fb..8f45bdd0ce 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java @@ -112,6 +112,7 @@ public class ErrorMessages { ADD_MESSAGE(UNSUPPORTED_DATATYPE, "unsupported data type: '%s'", 1); ADD_MESSAGE(INVALID_TABLE_PROPERTY, "invalid table property '%s': '%s'", 2); ADD_MESSAGE(MISSING_TABLE_PROPERTY, "table property '%s' required for '%s'", 2); + ADD_MESSAGE(INVALID_TABLESPACE_URI, "Invalid tablespace '%s' for table '%s'", 2); ADD_MESSAGE(AMBIGUOUS_PARTITION_DIRECTORY, "There is a directory which is assumed to be a partitioned directory" + " : '%s'", 1); diff --git a/tajo-common/src/main/proto/errors.proto b/tajo-common/src/main/proto/errors.proto index 6e13eb1490..cfb7242c96 100644 --- a/tajo-common/src/main/proto/errors.proto +++ b/tajo-common/src/main/proto/errors.proto @@ -172,6 +172,7 @@ enum ResultCode { UNSUPPORTED_DATATYPE = 1003; // SQLState: ? - Unsupported data type INVALID_TABLE_PROPERTY = 1004; // SQLState: ? - Invalid Table Property MISSING_TABLE_PROPERTY = 1005; // SQLState: ? - Missing table property + INVALID_TABLESPACE_URI = 1006; // Client Connection CLIENT_CONNECTION_EXCEPTION = 1101; // SQLState: 08000 - Client connection error diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java index 57a6aa3ffe..727bba9f02 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.auth.UserRoleInfo; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.FileUtil; @@ -98,10 +99,11 @@ public void testDump4() throws Exception { TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter); printWriter.flush(); printWriter.close(); + TableMeta meta = client.getTableDesc(getCurrentDatabase() + ".TableName1").getMeta(); assertOutputResult("testDump3.result", new String(bos.toByteArray()), new String[]{"${index.path}", "${table.timezone}"}, - new String[]{TablespaceManager.getDefault().getTableUri(getCurrentDatabase(), "test_idx").toString(), + new String[]{TablespaceManager.getDefault().getTableUri(meta, getCurrentDatabase(), "test_idx").toString(), testingCluster.getConfiguration().getSystemTimezone().getID()}); bos.close(); } finally { @@ -137,10 +139,12 @@ public void testPartitionsDump() throws Exception { printWriter.flush(); printWriter.close(); + TableMeta meta = client.getTableDesc(getCurrentDatabase() + ".TableName3").getMeta(); + assertOutputResult("testPartitionsDump.result", new String(bos.toByteArray()), new String[]{"${partition.path1}", "${partition.path2}", "${table.timezone}"}, - new String[]{TablespaceManager.getDefault().getTableUri(getCurrentDatabase(), "TableName3").toString(), - TablespaceManager.getDefault().getTableUri(getCurrentDatabase(), "TableName4").toString(), + new String[]{TablespaceManager.getDefault().getTableUri(meta, getCurrentDatabase(), "TableName3").toString(), + TablespaceManager.getDefault().getTableUri(meta, getCurrentDatabase(), "TableName4").toString(), testingCluster.getConfiguration().getSystemTimezone().getID()}); bos.close(); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java index 47c5a35639..816fbcda63 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java @@ -29,6 +29,7 @@ import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.TpchTestBase; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.cli.tsql.commands.TajoShellCommand; import org.apache.tajo.client.ClientParameters; import org.apache.tajo.client.QueryStatus; @@ -252,9 +253,10 @@ private void verifyDescTable(String sql, String tableName, String resultFileName String consoleResult = new String(out.toByteArray()); if (!cluster.isHiveCatalogStoreRunning()) { + TableMeta meta = cluster.getCatalogService().getTableDesc("default", tableName).getMeta(); assertOutputResult(resultFileName, consoleResult, new String[]{"${table.timezone}", "${table.path}"}, new String[]{cluster.getConfiguration().getSystemTimezone().getID(), - TablespaceManager.getDefault().getTableUri("default", tableName).toString()}); + TablespaceManager.getDefault().getTableUri(meta, "default", tableName).toString()}); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java index 9c3a1ad0b1..a95239c777 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java @@ -101,7 +101,7 @@ public TableDesc create(QueryContext queryContext, Tablespace tableSpace = getTablespaceHandler(tableSpaceName, uri); TableDesc desc; - URI tableUri = isExternal ? uri : tableSpace.getTableUri(databaseName, simpleTableName); + URI tableUri = isExternal ? uri : tableSpace.getTableUri(meta, databaseName, simpleTableName); desc = new TableDesc(qualifiedName, schema, meta, tableUri, isExternal); if (partitionDesc != null) { diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml index 7280e1f2fc..cbbd763a08 100644 --- a/tajo-dist/pom.xml +++ b/tajo-dist/pom.xml @@ -146,6 +146,7 @@ run cp -r $ROOT/tajo-sql-parser/target/tajo-sql-parser-${project.version}/* . run cp -r $ROOT/tajo-storage/tajo-storage-jdbc/target/tajo-storage-jdbc-${project.version}.jar . run cp -r $ROOT/tajo-storage/tajo-storage-pgsql/target/tajo-storage-pgsql-${project.version}.jar . + run cp -r $ROOT/tajo-tablespace-example/target/tajo-tablespace-example-${project.version}.jar . run cp -r $ROOT/tajo-pullserver/target/tajo-pullserver-${project.version}.jar . run cp -r $ROOT/tajo-metrics/target/tajo-metrics-${project.version}.jar . run cp -r $ROOT/tajo-core/target/tajo-core-${project.version}.jar . diff --git a/tajo-jdbc/pom.xml b/tajo-jdbc/pom.xml index b42d90de3f..fc9f39dbbe 100644 --- a/tajo-jdbc/pom.xml +++ b/tajo-jdbc/pom.xml @@ -76,7 +76,6 @@ maven-assembly-plugin - 2.4.1 jar-with-dependencies diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java index 3ed37874a7..af4011d5b4 100644 --- a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java +++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java @@ -76,7 +76,7 @@ public static void setup() throws Exception { rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(scoreSchema)); TableStats stats = new TableStats(); - Path p = new Path(sm.getTableUri("default", "score")); + Path p = new Path(sm.getTableUri(scoreMeta, "default", "score")); sm.getFileSystem().mkdirs(p); Appender appender = sm.getAppender(scoreMeta, scoreSchema, new Path(p, "score")); appender.init(); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index b64665839c..51b2568ce8 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@ -2009,7 +2009,10 @@ private URI getCreatedTableURI(PlanContext context, CreateTable createTable) { IdentifierUtil.extractQualifier(tableName) : context.queryContext.get(SessionVars.CURRENT_DATABASE); return storage.getTableURI( - createTable.getTableSpaceName(), databaseName, IdentifierUtil.extractSimpleName(tableName)); + createTable.getTableSpaceName(), + new TableMeta(createTable.getStorageType(), new KeyValueSet(createTable.getParams())), + databaseName, + IdentifierUtil.extractSimpleName(tableName)); } } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java index cbb7387146..73fdecad7d 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java @@ -19,6 +19,7 @@ package org.apache.tajo.plan; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.expr.EvalNode; @@ -39,7 +40,7 @@ public interface StorageService { * @param tableName Table name * @return Table URI */ - URI getTableURI(@Nullable String spaceName, String databaseName, String tableName); + URI getTableURI(@Nullable String spaceName, TableMeta meta, String databaseName, String tableName); long getTableVolumn(TableDesc table, Optional filter) throws UnsupportedException; } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseSchemaBuildPhase.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseSchemaBuildPhase.java index 14fb3ef8dd..e78a0d6c43 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseSchemaBuildPhase.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseSchemaBuildPhase.java @@ -293,6 +293,7 @@ public LogicalNode visitLimit(LogicalPlanner.PlanContext ctx, Stack stack, public LogicalNode visitSort(LogicalPlanner.PlanContext ctx, Stack stack, Sort expr) throws TajoException { stack.push(expr); LogicalNode child = visit(ctx, stack, expr.getChild()); + stack.pop(); SortNode sortNode = ctx.getPlan().createNode(SortNode.class); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java index a49c42389f..9b8b15fc6e 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java @@ -206,6 +206,16 @@ public LogicalNode visitLimit(ProcessorContext ctx, Stack stack, Limit exp @Override public LogicalNode visitSort(ProcessorContext ctx, Stack stack, Sort expr) throws TajoException { + for (Sort.SortSpec sortSpec : expr.getSortSpecs()) { + Set columns = ExprFinder.finds(sortSpec.getKey(), OpType.Column); + for (ColumnReferenceExpr col : columns) { + if (!ctx.aliasSet.contains(col.getName())) { + NameRefInSelectListNormalizer.normalize(ctx.planContext, col); + TUtil.putToNestedList(ctx.projectColumns, col.getQualifier(), col); + } + } + } + super.visitSort(ctx, stack, expr); SortNode node = getNodeFromExpr(ctx.planContext.getPlan(), expr); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java index 2b197ff0ec..27df7f8184 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java @@ -254,7 +254,8 @@ public Expr visitCreateTable(Context context, Stack stack, CreateTable exp if (expr.getStorageType() != null) { if (expr.hasSelfDescSchema()) { // TODO: support other types like Parquet and ORC. - if (!expr.getStorageType().equalsIgnoreCase(BuiltinStorages.JSON)) { + if (!expr.getStorageType().equalsIgnoreCase(BuiltinStorages.JSON) && + !expr.getStorageType().equalsIgnoreCase(BuiltinStorages.EX_HTTP_JSON)) { if (expr.getStorageType().equalsIgnoreCase(BuiltinStorages.PARQUET) || expr.getStorageType().equalsIgnoreCase(BuiltinStorages.ORC)) { throw new NotImplementedException(expr.getStorageType()); diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index 4fcd5dc97e..64b78d6bfd 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -99,7 +99,7 @@ public String toString() { return name + "=" + uri.toString(); } - public abstract long getTableVolume(TableDesc table, Optional filter) throws UnsupportedException; + public abstract long getTableVolume(TableDesc table, Optional filter); /** * if {@link StorageProperty#isArbitraryPathAllowed} is true, @@ -115,11 +115,12 @@ public URI getRootUri() { /** * Get Table URI * + * @param meta table meta * @param databaseName Database name * @param tableName Table name * @return Table URI */ - public abstract URI getTableUri(String databaseName, String tableName); + public abstract URI getTableUri(TableMeta meta, String databaseName, String tableName); /** * Returns the splits that will serve as input for the scan tasks. The @@ -280,13 +281,12 @@ public Appender getAppender(OverridableConf queryContext, if (appenderClass == null) { appenderClass = conf.getClass( String.format("tajo.storage.appender-handler.%s.class", handlerName), null, Appender.class); + if (appenderClass == null) { + throw new IOException("Undefined appender handler for " + meta.getDataFormat()); + } OldStorageManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass); } - if (appenderClass == null) { - throw new IOException("Unknown Storage Type: " + meta.getDataFormat()); - } - appender = OldStorageManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir); return appender; diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java index 88410bb2cc..9721df0259 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java @@ -32,6 +32,7 @@ import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.MetadataProvider; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UndefinedTablespaceException; @@ -430,9 +431,9 @@ public static Optional getAnyByScheme(String scheme) { } @Override - public URI getTableURI(@Nullable String spaceName, String databaseName, String tableName) { + public URI getTableURI(@Nullable String spaceName, TableMeta meta, String databaseName, String tableName) { Tablespace space = spaceName == null ? getDefault() : getByName(spaceName); - return space.getTableUri(databaseName, tableName); + return space.getTableUri(meta, databaseName, tableName); } @Override diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java index 9c4fce5c92..cc7eea55fb 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java @@ -22,4 +22,5 @@ public class BuiltinFragmentKinds { public static final String FILE = "FILE"; public static final String HBASE = "HBASE"; public static final String JDBC = "JDBC"; + public static final String HTTP = "EXAMPLE-HTTP"; } diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml index 4e5720490d..ce0ce85393 100644 --- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml @@ -39,7 +39,7 @@ tajo.storage.scanner-handler - text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase + text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase,ex_http_json @@ -55,6 +55,10 @@ tajo.storage.fragment.kind.jdbc org.apache.tajo.storage.jdbc.JdbcFragment + + tajo.storage.fragment.kind.example-http + org.apache.tajo.storage.http.ExampleHttpFileFragment + tajo.storage.fragment.serde.file org.apache.tajo.storage.fragment.FileFragmentSerde @@ -67,6 +71,10 @@ tajo.storage.fragment.serde.jdbc org.apache.tajo.storage.jdbc.JdbcFragmentSerde + + tajo.storage.fragment.serde.example-http + org.apache.tajo.storage.http.ExampleHttpFileFragmentSerde + @@ -123,7 +131,12 @@ tajo.storage.scanner-handler.hbase.class org.apache.tajo.storage.hbase.HBaseScanner - + + + tajo.storage.scanner-handler.ex_http_json.class + org.apache.tajo.storage.http.ExampleHttpJsonScanner + + tajo.storage.appender-handler diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml index 5bf6b0b45c..1737e221c7 100644 --- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml @@ -38,7 +38,7 @@ tajo.storage.scanner-handler - text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase + text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase,ex_http_json @@ -126,6 +126,11 @@ org.apache.tajo.storage.hbase.HBaseScanner + + tajo.storage.scanner-handler.ex_http_json.class + org.apache.tajo.storage.http.ExampleHttpJsonScanner + + tajo.storage.appender-handler diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index e3f7c25e4b..2dbd137682 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -396,7 +396,7 @@ public void purgeTable(TableDesc tableDesc) throws IOException, TajoException { } @Override - public URI getTableUri(String databaseName, String tableName) { + public URI getTableUri(TableMeta meta, String databaseName, String tableName) { return URI.create(uri.toString() + "/" + tableName); } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java index 94e07e5193..58001b934c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java @@ -18,8 +18,6 @@ package org.apache.tajo.storage; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,33 +30,32 @@ import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.AbstractFileFragment; import org.apache.tajo.storage.fragment.Fragment; import java.io.IOException; public abstract class FileScanner implements Scanner { - private static final Log LOG = LogFactory.getLog(FileScanner.class); protected boolean inited = false; protected final Configuration conf; protected final TableMeta meta; protected final Schema schema; - protected final FileFragment fragment; + protected final AbstractFileFragment fragment; protected final int columnNum; protected Column [] targets; protected float progress; - protected TableStats tableStats; + protected TableStats inputStats; public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) { this.conf = conf; this.meta = meta; this.schema = schema; - this.fragment = (FileFragment)fragment; - this.tableStats = new TableStats(); + this.fragment = (AbstractFileFragment)fragment; + this.inputStats = new TableStats(); this.columnNum = this.schema.size(); } @@ -67,14 +64,14 @@ public void init() throws IOException { progress = 0.0f; if (fragment != null) { - tableStats.setNumBytes(fragment.getLength()); - tableStats.setNumBlocks(1); + inputStats.setNumBytes(fragment.getLength()); + inputStats.setNumBlocks(1); } if (schema != null) { for(Column eachColumn: schema.getRootColumns()) { ColumnStats columnStats = new ColumnStats(eachColumn); - tableStats.addColumnStat(columnStats); + inputStats.addColumnStat(columnStats); } } } @@ -112,6 +109,6 @@ public float getProgress() { @Override public TableStats getInputStats() { - return tableStats; + return inputStats; } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 17c413ebfa..2785de4ea4 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -126,7 +126,7 @@ protected void storageInit() throws IOException { } @Override - public long getTableVolume(TableDesc table, Optional filter) throws UnsupportedException { + public long getTableVolume(TableDesc table, Optional filter) { Path path = new Path(table.getUri()); ContentSummary summary; try { @@ -169,7 +169,7 @@ public boolean exists(Path path) throws IOException { } @Override - public URI getTableUri(String databaseName, String tableName) { + public URI getTableUri(TableMeta meta, String databaseName, String tableName) { return StorageUtil.concatPath(spacePath, databaseName, tableName).toUri(); } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java index a6850c14c2..348bb3cd7c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -435,12 +435,12 @@ public boolean isSplittable(){ @Override public TableStats getInputStats() { - if(tableStats != null){ - tableStats.setNumRows(recordCount); - tableStats.setReadBytes(totalReadBytes); // actual read bytes (scan + rescan * n) - tableStats.setNumBytes(fragment.getLength()); + if(inputStats != null){ + inputStats.setNumRows(recordCount); + inputStats.setReadBytes(totalReadBytes); // actual read bytes (scan + rescan * n) + inputStats.setNumBytes(fragment.getLength()); } - return tableStats; + return inputStats; } @Override diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/AbstractFileFragment.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/AbstractFileFragment.java new file mode 100644 index 0000000000..cf726e2bc7 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/AbstractFileFragment.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.fragment; + +import org.apache.hadoop.fs.Path; + +import java.net.URI; + +/** + * Abstract fragment implementation for file systems. + */ +public abstract class AbstractFileFragment extends Fragment { + + protected AbstractFileFragment(String kind, + URI uri, + String inputSourceId, + long startKey, + long endKey, + long length, + String[] hostNames) { + super(kind, uri, inputSourceId, startKey, endKey, length, hostNames); + } + + public Path getPath() { + return new Path(uri); + } +} diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java index 7bdf0cbee3..0ead6009a4 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java @@ -30,7 +30,7 @@ /** * Fragment for file systems. */ -public class FileFragment extends Fragment { +public class FileFragment extends AbstractFileFragment { private Integer[] diskIds; // disk volume ids public FileFragment(String tableName, Path uri, BlockLocation blockLocation) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/OrcScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/OrcScanner.java index 1329955942..0a20d669d0 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/OrcScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/OrcScanner.java @@ -300,8 +300,8 @@ public void reset() throws IOException { public void close() throws IOException { if (recordReader != null) { recordReader.close(); - tableStats.setNumBytes(recordReader.getNumBytes()); - tableStats.setNumRows(recordCount); + inputStats.setNumBytes(recordReader.getNumBytes()); + inputStats.setNumRows(recordCount); } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java index 1e2380e2eb..d876e36c91 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java @@ -171,9 +171,9 @@ public void reset() throws IOException { @Override public void close() throws IOException { - if (tableStats != null) { - tableStats.setReadBytes(filePosition - fragment.getStartKey()); - tableStats.setNumRows(recordCount); + if (inputStats != null) { + inputStats.setReadBytes(filePosition - fragment.getStartKey()); + inputStats.setNumRows(recordCount); } if(tupleBuffer != null) { tupleBuffer.release(); @@ -206,12 +206,12 @@ public boolean isSplittable(){ @Override public TableStats getInputStats() { - if(tableStats != null){ - tableStats.setNumRows(recordCount); - tableStats.setReadBytes(filePosition - fragment.getStartKey()); // actual read bytes (scan + rescan * n) - tableStats.setNumBytes(fragment.getLength()); + if(inputStats != null){ + inputStats.setNumRows(recordCount); + inputStats.setReadBytes(filePosition - fragment.getStartKey()); // actual read bytes (scan + rescan * n) + inputStats.setNumBytes(fragment.getLength()); } - return tableStats; + return inputStats; } @Override diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java index ebdc4722a6..2d21928343 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java @@ -1591,9 +1591,9 @@ protected void currentValueBuffer() throws IOException { currentValue.inited = true; readBytes += currentValue.getReadBytes(); - if (tableStats != null) { - tableStats.setReadBytes(readBytes); - tableStats.setNumRows(passedRowsNum); + if (inputStats != null) { + inputStats.setReadBytes(readBytes); + inputStats.setNumRows(passedRowsNum); } } @@ -1776,9 +1776,9 @@ public boolean isSplittable() { @Override public void close() throws IOException { - if (tableStats != null) { - tableStats.setReadBytes(readBytes); //Actual Processed Bytes. (decompressed bytes + header - seek) - tableStats.setNumRows(passedRowsNum); + if (inputStats != null) { + inputStats.setReadBytes(readBytes); //Actual Processed Bytes. (decompressed bytes + header - seek) + inputStats.setNumRows(passedRowsNum); } IOUtils.cleanup(LOG, in, currentValue); diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java index b7dc1eca9f..74ded01385 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java @@ -330,9 +330,9 @@ public void close() throws IOException { if (reader != null) reader.close(); - if (tableStats != null) { - tableStats.setReadBytes(totalBytes); - tableStats.setNumRows(currentIdx); + if (inputStats != null) { + inputStats.setReadBytes(totalBytes); + inputStats.setNumRows(currentIdx); } outTuple = null; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java index a2688b1213..c383eba99f 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java @@ -36,7 +36,7 @@ import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.*; import org.apache.tajo.storage.compress.CodecPool; -import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.AbstractFileFragment; import org.apache.tajo.unit.StorageUnit; import java.io.*; @@ -57,15 +57,16 @@ public class DelimitedLineReader implements Closeable { private boolean eof = true; private ByteBufLineReader lineReader; private AtomicInteger lineReadBytes = new AtomicInteger(); - private FileFragment fragment; + private AbstractFileFragment fragment; private Configuration conf; private int bufferSize; - public DelimitedLineReader(Configuration conf, final FileFragment fragment) throws IOException { + public DelimitedLineReader(Configuration conf, final AbstractFileFragment fragment) throws IOException { this(conf, fragment, 128 * StorageUnit.KB); } - public DelimitedLineReader(Configuration conf, final FileFragment fragment, int bufferSize) throws IOException { + public DelimitedLineReader(Configuration conf, final AbstractFileFragment fragment, int bufferSize) + throws IOException { this.fragment = fragment; this.conf = conf; this.factory = new CompressionCodecFactory(conf); @@ -115,13 +116,13 @@ public void init() throws IOException { channel.position(startOffset); is = inputStream; lineReader = new ByteBufLineReader(new LocalFileInputChannel(inputStream), - BufferPool.directBuffer((int) Math.min(bufferSize, end))); + BufferPool.directBuffer((int) Math.min(bufferSize, fragment.getLength()))); } else { fis = fs.open(fragment.getPath()); fis.seek(startOffset); is = fis; lineReader = new ByteBufLineReader(new FSDataInputChannel(fis), - BufferPool.directBuffer((int) Math.min(bufferSize, end))); + BufferPool.directBuffer((int) Math.min(bufferSize, fragment.getLength()))); } } eof = false; @@ -145,7 +146,7 @@ public long getCompressedPosition() throws IOException { return retVal; } - public long getUnCompressedPosition() throws IOException { + public long getUncompressedPosition() throws IOException { return pos; } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index d52f46da69..08ce7bd71e 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -348,59 +348,52 @@ public Tuple next() throws IOException { return null; } - try { - - // this loop will continue until one tuple is build or EOS (end of stream). - do { - long offset = reader.getUnCompressedPosition(); - ByteBuf buf = reader.readLine(); - - // if no more line, then return EOT (end of tuple) - if (buf == null) { - return null; - } - - // If there is no required column, we just read each line - // and then return an empty tuple without parsing line. - if (targets.length == 0) { - recordCount++; - return EmptyTuple.get(); - } + // this loop will continue until one tuple is build or EOS (end of stream). + do { + long offset = reader.getUncompressedPosition(); + ByteBuf buf = reader.readLine(); + + // if no more line, then return EOT (end of tuple) + if (buf == null) { + return null; + } - outTuple.setOffset(offset); + // If there is no required column, we just read each line + // and then return an empty tuple without parsing line. + if (targets.length == 0) { + recordCount++; + return EmptyTuple.get(); + } - try { - deserializer.deserialize(buf, outTuple); - // if a line is read normally, it exits this loop. - break; + outTuple.setOffset(offset); - } catch (TextLineParsingError tae) { + try { + deserializer.deserialize(buf, outTuple); + // if a line is read normally, it exits this loop. + break; - errorNum++; + } catch (TextLineParsingError tae) { - // suppress too many log prints, which probably cause performance degradation - if (errorNum < errorPrintOutMaxNum) { - LOG.warn("Ignore Text Parse Error (" + errorNum + "): ", tae); - } + errorNum++; - // Only when the maximum error torrence limit is set (i.e., errorTorrenceMaxNum >= 0), - // it checks if the number of parsing error exceeds the max limit. - // Otherwise, it will ignore all parsing errors. - if (errorTorrenceMaxNum >= 0 && errorNum > errorTorrenceMaxNum) { - throw tae; - } + // suppress too many log prints, which probably cause performance degradation + if (errorNum < errorPrintOutMaxNum) { + LOG.warn("Ignore Text Parse Error (" + errorNum + "): ", tae); } - } while (reader.isReadable()); // continue until EOS - // recordCount means the number of actual read records. We increment the count here. - recordCount++; + // Only when the maximum error torrence limit is set (i.e., errorTorrenceMaxNum >= 0), + // it checks if the number of parsing error exceeds the max limit. + // Otherwise, it will ignore all parsing errors. + if (errorTorrenceMaxNum >= 0 && errorNum > errorTorrenceMaxNum) { + throw new IOException(tae); + } + } + } while (reader.isReadable()); // continue until EOS - return outTuple; + // recordCount means the number of actual read records. We increment the count here. + recordCount++; - } catch (Throwable t) { - LOG.error(t); - throw new IOException(t); - } + return outTuple; } @Override @@ -453,8 +446,8 @@ public void close() throws IOException { } if (reader != null) { - tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) - tableStats.setNumRows(recordCount); + inputStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) + inputStats.setNumRows(recordCount); } if (LOG.isDebugEnabled()) { @@ -488,17 +481,17 @@ public boolean isSplittable() { @Override public TableStats getInputStats() { - if (tableStats != null && reader != null) { - tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) - tableStats.setNumRows(recordCount); - tableStats.setNumBytes(fragment.getLength()); + if (inputStats != null && reader != null) { + inputStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) + inputStats.setNumRows(recordCount); + inputStats.setNumBytes(fragment.getLength()); } - return tableStats; + return inputStats; } @Override public long getNextOffset() throws IOException { - return reader.getUnCompressedPosition(); + return reader.getUncompressedPosition(); } @Override diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java index a29e86bdd0..6faeaf6034 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java @@ -32,7 +32,7 @@ import org.apache.tajo.catalog.SchemaBuilder; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; -import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.AbstractFileFragment; import org.apache.tajo.storage.thirdparty.orc.TreeReaderFactory.DatumTreeReader; import java.io.Closeable; @@ -71,7 +71,7 @@ public OrcRecordReader(List stripes, FileSystem fileSystem, Schema schema, Column[] targets, - FileFragment fragment, + AbstractFileFragment fragment, List types, CompressionCodec codec, int bufferSize, diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java index fb095a40bf..9b712a818e 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java @@ -224,11 +224,12 @@ public Column apply(@Nullable Pair columnPair) { final TableStats stats = new TableStats(); stats.setNumRows(-1); // unknown + TableMeta meta = new TableMeta("rowstore", new KeyValueSet()); final TableDesc table = new TableDesc( IdentifierUtil.buildFQName(databaseName, name), schema, - new TableMeta("rowstore", new KeyValueSet()), - space.getTableUri(databaseName, name) + meta, + space.getTableUri(meta, databaseName, name) ); table.setStats(stats); diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java index c3538316c1..21bce9c416 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java @@ -110,12 +110,12 @@ protected void storageInit() throws IOException { } @Override - public long getTableVolume(TableDesc table, Optional filter) throws UnsupportedException { - throw new UnsupportedException(); + public long getTableVolume(TableDesc table, Optional filter) { + throw new TajoRuntimeException(new UnsupportedException()); } @Override - public URI getTableUri(String databaseName, String tableName) { + public URI getTableUri(TableMeta meta, String databaseName, String tableName) { return URI.create(UriUtil.addParam(getUri().toASCIIString(), URI_PARAM_KEY_TABLE, tableName)); } diff --git a/tajo-tablespace-example/pom.xml b/tajo-tablespace-example/pom.xml new file mode 100644 index 0000000000..7e15e0c14f --- /dev/null +++ b/tajo-tablespace-example/pom.xml @@ -0,0 +1,256 @@ + + + + + + tajo-project + org.apache.tajo + 0.12.0-SNAPSHOT + ../tajo-project/pom.xml + + 4.0.0 + + tajo-tablespace-example + jar + Tajo Tablespace Example + + UTF-8 + UTF-8 + + + + + + org.apache.rat + apache-rat-plugin + + + derby.log + src/test/resources/dataset/** + src/test/resources/queries/** + src/test/resources/results/** + + + + + verify + + check + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + true + + -Xms128m -Xmx1024m -Dfile.encoding=UTF-8 + + + + org.apache.maven.plugins + maven-antrun-plugin + + + create-protobuf-generated-sources-directory + initialize + + + + + + + run + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.2 + + + generate-sources + generate-sources + + protoc + + -Isrc/main/proto/ + --proto_path=../tajo-common/src/main/proto + --proto_path=../tajo-catalog/tajo-catalog-common/src/main/proto + --java_out=target/generated-sources/proto + src/main/proto/ExampleHttpFragmentProtos.proto + + + + exec + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.5 + + + add-source + generate-sources + + add-source + + + + target/generated-sources/proto + + + + + + + + + + + org.apache.tajo + tajo-common + provided + + + org.apache.tajo + tajo-catalog-common + provided + + + org.apache.tajo + tajo-plan + provided + + + org.apache.tajo + tajo-storage-common + provided + + + org.apache.tajo + tajo-storage-hdfs + provided + + + org.apache.tajo + tajo-rpc-common + + + org.apache.tajo + tajo-cluster-tests + test-jar + test + + + junit + junit + test + + + io.netty + netty-transport + test + + + io.netty + netty-codec-http + test + + + + org.apache.hadoop + hadoop-common + provided + + + org.apache.hadoop + hadoop-minicluster + test + + + commons-el + commons-el + + + tomcat + jasper-runtime + + + tomcat + jasper-compiler + + + org.mortbay.jetty + jsp-2.1-jetty + + + com.sun.jersey.jersey-test-framework + jersey-test-framework-grizzly2 + + + netty-all + io.netty + + + + + org.apache.hadoop + hadoop-hdfs + test + + + commons-el + commons-el + + + tomcat + jasper-runtime + + + tomcat + jasper-compiler + + + org.mortbay.jetty + jsp-2.1-jetty + + + com.sun.jersey.jersey-test-framework + jersey-test-framework-grizzly2 + + + netty-all + io.netty + + + + + + \ No newline at end of file diff --git a/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpFileFragment.java b/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpFileFragment.java new file mode 100644 index 0000000000..86ebc10257 --- /dev/null +++ b/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpFileFragment.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.http; + +import org.apache.tajo.storage.fragment.AbstractFileFragment; +import org.apache.tajo.storage.fragment.BuiltinFragmentKinds; + +import java.net.URI; + +public class ExampleHttpFileFragment extends AbstractFileFragment { + + public ExampleHttpFileFragment(URI uri, + String inputSourceId, + long startKey, + long endKey) { + super(BuiltinFragmentKinds.HTTP, uri, inputSourceId, startKey, endKey, endKey - startKey, null); + } +} diff --git a/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpFileFragmentSerde.java b/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpFileFragmentSerde.java new file mode 100644 index 0000000000..866526922d --- /dev/null +++ b/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpFileFragmentSerde.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.http; + +import com.google.protobuf.GeneratedMessage.Builder; +import org.apache.tajo.storage.fragment.FragmentSerde; +import org.apache.tajo.storage.http.ExampleHttpFragmentProtos.ExampleHttpFileFragmentProto; + +import java.net.URI; + +public class ExampleHttpFileFragmentSerde + implements FragmentSerde { + + @Override + public Builder newBuilder() { + return ExampleHttpFileFragmentProto.newBuilder(); + } + + @Override + public ExampleHttpFileFragmentProto serialize(ExampleHttpFileFragment fragment) { + return ExampleHttpFileFragmentProto.newBuilder() + .setUri(fragment.getUri().toASCIIString()) + .setTableName(fragment.getInputSourceId()) + .setStartKey(fragment.getStartKey()) + .setEndKey(fragment.getEndKey()) + .build(); + } + + @Override + public ExampleHttpFileFragment deserialize(ExampleHttpFileFragmentProto proto) { + return new ExampleHttpFileFragment( + URI.create(proto.getUri()), + proto.getTableName(), + proto.getStartKey(), + proto.getEndKey() + ); + } +} diff --git a/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpFileTablespace.java b/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpFileTablespace.java new file mode 100644 index 0000000000..258d505952 --- /dev/null +++ b/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpFileTablespace.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.http; + +import com.google.common.collect.Lists; +import net.minidev.json.JSONObject; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.BuiltinStorages; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.storage.FormatProperty; +import org.apache.tajo.storage.StorageProperty; +import org.apache.tajo.storage.Tablespace; +import org.apache.tajo.storage.TupleRange; +import org.apache.tajo.storage.fragment.Fragment; +import org.jboss.netty.handler.codec.http.HttpHeaders.Names; + +import javax.annotation.Nullable; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.util.List; +import java.util.Optional; + +/** + * Example read-only tablespace for HTTP protocol. + * + * An example table can be created by using the following SQL query. + * + * CREATE TABLE http_test (*) TABLESPACE http_example USING ex_http_json WITH ('path'='2015-01-01-15.json.gz', + * 'compression.codec'='org.apache.hadoop.io.compress.GzipCodec’); + */ +public class ExampleHttpFileTablespace extends Tablespace { + private static final Log LOG = LogFactory.getLog(ExampleHttpFileTablespace.class); + + static final String PATH = "path"; + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // Tablespace properties + ////////////////////////////////////////////////////////////////////////////////////////////////// + private static final StorageProperty STORAGE_PROPERTY = + new StorageProperty( + BuiltinStorages.JSON, // default format is json + false, // is not movable + false, // is not writable + true, // allow arbitrary path + false // doesn't provide metadata + ); + + private static final FormatProperty FORMAT_PROPERTY = + new FormatProperty( + false, // doesn't support insert + false, // doesn't support direct insert + false // doesn't support result staging + ); + + public ExampleHttpFileTablespace(String name, URI uri, JSONObject config) { + super(name, uri, config); + + LOG.info("ExampleHttpFileTablespace is initialized for " + uri); + } + + @Override + protected void storageInit() throws IOException { + // Add initialization code for your tablespace + } + + @Override + public long getTableVolume(TableDesc table, Optional notUsed) { + HttpURLConnection connection = null; + + try { + connection = (HttpURLConnection) new URL(table.getUri().toASCIIString()).openConnection(); + connection.setRequestMethod("HEAD"); + connection.connect(); + return connection.getHeaderFieldLong(Names.CONTENT_LENGTH, -1); + + } catch (IOException e) { + throw new TajoInternalError(e); + + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + + @Override + public URI getRootUri() { + return uri; + } + + @Override + public URI getTableUri(TableMeta meta, String databaseName, String tableName) { + String tablespaceUriString = uri.toASCIIString(); + String tablePath = meta.getProperty(PATH); + + if (!tablespaceUriString.endsWith("/") && !tablePath.startsWith("/")) { + tablePath = "/" + tablePath; + } + + return URI.create(tablespaceUriString + tablePath); + } + + @Override + public List getSplits(String inputSourceId, + TableDesc tableDesc, + boolean requireSort, + @Nullable EvalNode filterCondition) + throws IOException, TajoException { + + // getSplits() should return multiple fragments for distributed processing of a large data. + // This example tablespace returns only one fragment for the whole data for simplicity, + // but this may significantly increase the query processing time. + + long tableVolume = getTableVolume(tableDesc, Optional.empty()); + return Lists.newArrayList(new ExampleHttpFileFragment(tableDesc.getUri(), inputSourceId, 0, tableVolume)); + } + + @Override + public StorageProperty getProperty() { + return STORAGE_PROPERTY; + } + + @Override + public FormatProperty getFormatProperty(TableMeta meta) { + return FORMAT_PROPERTY; + } + + @Override + public void close() { + // do nothing + } + + @Override + public TupleRange[] getInsertSortRanges(OverridableConf queryContext, + TableDesc tableDesc, + Schema inputSchema, + SortSpec[] sortSpecs, + TupleRange dataRange) throws IOException { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws TajoException { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public void createTable(TableDesc table, boolean ifNotExists) throws TajoException, IOException { + HttpURLConnection connection = null; + + try { + connection = (HttpURLConnection) new URL(table.getUri().toASCIIString()).openConnection(); + connection.setRequestMethod("HEAD"); + connection.connect(); + + if (connection.getResponseCode() == 404) { + throw new FileNotFoundException(); + } + + } catch (IOException e) { + throw new TajoInternalError(e); + + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + + @Override + public void purgeTable(TableDesc tableDesc) throws IOException, TajoException { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public void prepareTable(LogicalNode node) throws IOException, TajoException { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public Path commitTable(OverridableConf queryContext, + ExecutionBlockId finalEbId, + LogicalPlan plan, + Schema schema, + TableDesc tableDesc) throws IOException { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public void rollbackTable(LogicalNode node) throws IOException, TajoException { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException { + throw new TajoRuntimeException(new UnsupportedException()); + } +} diff --git a/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpJsonLineReader.java b/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpJsonLineReader.java new file mode 100644 index 0000000000..61e4d6c377 --- /dev/null +++ b/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpJsonLineReader.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.http; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.SplittableCompressionCodec; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.BufferPool; +import org.apache.tajo.storage.ByteBufInputChannel; +import org.apache.tajo.storage.compress.CodecPool; +import org.apache.tajo.storage.fragment.AbstractFileFragment; +import org.apache.tajo.storage.text.ByteBufLineReader; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.concurrent.atomic.AtomicInteger; + +public class ExampleHttpJsonLineReader implements Closeable { + + private final static Log LOG = LogFactory.getLog(ExampleHttpJsonLineReader.class); + + private Configuration conf; + + private HttpURLConnection connection; + private InputStream is; + + private CompressionCodec codec; + private Decompressor decompressor; + + private long startOffset, endOffset, pos; + private boolean eof = true; + private ByteBufLineReader lineReader; + private AtomicInteger lineReadBytes = new AtomicInteger(); + private ExampleHttpFileFragment fragment; + private final int bufferSize; + + public ExampleHttpJsonLineReader(Configuration conf, + AbstractFileFragment fragment, + int bufferSize) { + this.conf = conf; + this.fragment = (ExampleHttpFileFragment) fragment; + this.bufferSize = bufferSize; + + CompressionCodecFactory factory = new CompressionCodecFactory(conf); + codec = factory.getCodec(fragment.getPath()); + if (this.codec instanceof SplittableCompressionCodec) { + // bzip2 does not support multi-thread model + throw new TajoRuntimeException(new UnsupportedException(codec.getDefaultExtension())); + } + } + + public void init() throws IOException { + if (connection != null) { + throw new IOException(this.getClass() + " is already initialized"); + } + + pos = startOffset = fragment.getStartKey(); + endOffset = fragment.getEndKey(); + + URL url = new URL(fragment.getUri().toASCIIString()); + connection = (HttpURLConnection) url.openConnection(); + + is = connection.getInputStream(); + + ByteBuf buf; + if (codec != null) { + decompressor = CodecPool.getDecompressor(codec); + is = codec.createInputStream(is, decompressor); + + buf = BufferPool.directBuffer(bufferSize); + + } else { + buf = BufferPool.directBuffer((int) Math.min(bufferSize, fragment.getLength())); + + } + + lineReader = new ByteBufLineReader(new ByteBufInputChannel(is), buf); + + eof = false; + } + + public ByteBuf readLine() throws IOException { + if (eof) { + return null; + } + + ByteBuf buf = lineReader.readLineBuf(lineReadBytes); + pos += lineReadBytes.get(); + if (buf == null) { + eof = true; + } + + return buf; + } + + public boolean isCompressed() { + return codec != null; + } + + public long getPos() { + return pos; + } + + public long getReadBytes() { + return pos - startOffset; + } + + public boolean isEof() { + return eof; + } + + @Override + public void close() throws IOException { + try { + IOUtils.cleanup(LOG, lineReader); + + if (connection != null) { + connection.disconnect(); + } + + is = null; + lineReader = null; + + } finally { + CodecPool.returnDecompressor(decompressor); + decompressor = null; + } + } +} diff --git a/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpJsonScanner.java b/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpJsonScanner.java new file mode 100644 index 0000000000..6c41a50abe --- /dev/null +++ b/tajo-tablespace-example/src/main/java/org/apache/tajo/storage/http/ExampleHttpJsonScanner.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.http; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.storage.EmptyTuple; +import org.apache.tajo.storage.FileScanner; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.json.JsonLineDeserializer; +import org.apache.tajo.storage.text.TextLineParsingError; +import org.apache.tajo.unit.StorageUnit; + +import java.io.IOException; + +import static org.apache.tajo.storage.StorageConstants.DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM; +import static org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXNUM; +import static org.apache.tajo.storage.text.DelimitedTextFile.READ_BUFFER_SIZE; + +public class ExampleHttpJsonScanner extends FileScanner { + + private static final Log LOG = LogFactory.getLog(ExampleHttpJsonScanner.class); + + private VTuple outTuple; + + private long limit; + + private final long startOffset; + private final long endOffset; + + private ExampleHttpJsonLineReader reader; + private JsonLineDeserializer deserializer; + + private int errorPrintOutMaxNum = 5; + /** Maximum number of permissible errors */ + private final int maxAllowedErrorCount; + /** How many errors have occurred? */ + private int errorNum; + + private boolean splittable = false; + + private long recordCount = 0; + + public ExampleHttpJsonScanner(Configuration conf, Schema schema, TableMeta tableMeta, Fragment fragment) + throws IOException { + super(conf, schema, tableMeta, fragment); + + reader = new ExampleHttpJsonLineReader(conf, this.fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB)); + if (!this.reader.isCompressed()) { + splittable = true; + } + + startOffset = this.fragment.getStartKey(); + endOffset = this.fragment.getEndKey(); + + maxAllowedErrorCount = + Integer.parseInt(tableMeta.getProperty(TEXT_ERROR_TOLERANCE_MAXNUM, DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM)); + } + + @Override + public void init() throws IOException { + + reader.init(); + + if (targets == null) { + targets = schema.toArray(); + } + + reset(); + + super.init(); + } + + @Override + public Tuple next() throws IOException { + + if (reader.isEof()) { + return null; // Indicate to the parent operator that there is no more data. + } + + // Read lines until it reads a valid tuple or EOS (end of stream). + while (!reader.isEof()) { + + ByteBuf buf = reader.readLine(); + + if (buf == null) { // The null buf means that there is no more lines. + return null; + } + + // When the number of projection columns is 0, the read line doesn't have to be parsed. + if (targets.length == 0) { + recordCount++; + return EmptyTuple.get(); + } + + try { + deserializer.deserialize(buf, outTuple); + + // Once a line is normally parsed, exits the while loop. + break; + + } catch (TextLineParsingError tlpe) { + + errorNum++; + + // The below line may print too many logs. + LOG.warn("Ignore Text Parse Error (" + errorNum + "): ", tlpe); + + // If the number of found errors exceeds the configured tolerable error count, + // throw the error. + if (maxAllowedErrorCount >= 0 && errorNum > maxAllowedErrorCount) { + throw new IOException(tlpe); + } + } + } + + recordCount++; + + return outTuple; + } + + @Override + public void reset() throws IOException { + recordCount = 0; + + if (reader.getReadBytes() > 0) { + reader.close(); + + reader = new ExampleHttpJsonLineReader(conf, fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB)); + reader.init(); + } + + if(deserializer != null) { + deserializer.release(); + } + + deserializer = new JsonLineDeserializer(schema, meta, targets); + deserializer.init(); + + outTuple = new VTuple(targets.length); + + // skip first line if it reads from middle of file + if (startOffset > 0) { + reader.readLine(); + } + } + + @Override + public void close() throws IOException { + try { + + if (deserializer != null) { + deserializer.release(); + } + + if (reader != null) { + inputStats.setReadBytes(reader.getReadBytes()); + inputStats.setNumRows(recordCount); + } + + } finally { + IOUtils.cleanup(LOG, reader); + outTuple = null; + } + } + + @Override + public boolean isProjectable() { + return true; + } + + @Override + public void setTarget(Column[] targets) { + this.targets = targets; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setFilter(EvalNode filter) { + throw new TajoRuntimeException(new UnsupportedException()); + } + + @Override + public void setLimit(long num) { + this.limit = num; + } + + @Override + public boolean isSplittable() { + return splittable; + } + + @Override + public float getProgress() { + if(!inited) return super.getProgress(); + + if (reader.isEof()) { // if the reader reaches EOS + return 1.0f; + } + + long currentPos = reader.getPos(); + long readBytes = currentPos - startOffset; + long remainingBytes = Math.max(endOffset - currentPos, 0); + return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes)); + } + + @Override + public TableStats getInputStats() { + if (inputStats != null && reader != null) { + inputStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) + inputStats.setNumRows(recordCount); + inputStats.setNumBytes(fragment.getLength()); + } + + return inputStats; + } +} diff --git a/tajo-tablespace-example/src/main/proto/ExampleHttpFragmentProtos.proto b/tajo-tablespace-example/src/main/proto/ExampleHttpFragmentProtos.proto new file mode 100644 index 0000000000..586e46451d --- /dev/null +++ b/tajo-tablespace-example/src/main/proto/ExampleHttpFragmentProtos.proto @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tajo.storage.http"; +option java_outer_classname = "ExampleHttpFragmentProtos"; +option optimize_for = SPEED; +option java_generic_services = false; +option java_generate_equals_and_hash = true; + +import "CatalogProtos.proto"; + +message ExampleHttpFileFragmentProto { + required string uri = 1; + required string table_name = 2; + required int64 start_key = 3; + required int64 end_key = 4; +} \ No newline at end of file diff --git a/tajo-tablespace-example/src/test/java/org/apache/tajo/storage/http/ExampleHttpServerHandler.java b/tajo-tablespace-example/src/test/java/org/apache/tajo/storage/http/ExampleHttpServerHandler.java new file mode 100644 index 0000000000..fd25da8975 --- /dev/null +++ b/tajo-tablespace-example/src/test/java/org/apache/tajo/storage/http/ExampleHttpServerHandler.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.http; + +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.handler.codec.http.*; +import io.netty.handler.codec.http.HttpHeaders.Names; +import io.netty.util.CharsetUtil; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.activation.MimetypesFileTypeMap; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +public class ExampleHttpServerHandler extends SimpleChannelInboundHandler { + + private static final Log LOG = LogFactory.getLog(ExampleHttpServerHandler.class); + + @Override + protected void channelRead0(ChannelHandlerContext context, FullHttpRequest request) throws Exception { + + if (request.getMethod().equals(HttpMethod.HEAD)) { + + processHead(context, request); + + } else if (request.getMethod().equals(HttpMethod.GET)) { + + processGet(context, request); + + } else { + // error + String msg = "Not supported method: " + request.getMethod(); + LOG.error(msg); + context.writeAndFlush(getBadRequest(msg)); + } + } + + private void processHead(ChannelHandlerContext context, FullHttpRequest request) { + HttpHeaders headers = request.headers(); + FullHttpResponse response = null; + + if (headers.contains(Names.CONTENT_LENGTH)) { + + try { + File file = getRequestedFile(request.getUri()); + + response = new DefaultFullHttpResponse( + HTTP_1_1, + request.getDecoderResult().isSuccess() ? OK : BAD_REQUEST + ); + + HttpHeaders.setContentLength(response, file.length()); + + + } catch (FileNotFoundException | URISyntaxException e) { + response = getBadRequest(e.getMessage()); + } + } + + context.writeAndFlush(response); + } + + private void processGet(ChannelHandlerContext context, FullHttpRequest request) { + try { + File file = getRequestedFile(request.getUri()); + + RandomAccessFile raf = new RandomAccessFile(file, "r"); + long fileLength = raf.length(); + + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + HttpHeaders.setContentLength(response, fileLength); + setContentTypeHeader(response, file); + + context.write(response); + + context.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength)); + + // Write the end marker. + ChannelFuture future = context.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + future.addListener(ChannelFutureListener.CLOSE); + + } catch (IOException | URISyntaxException e) { + context.writeAndFlush(getBadRequest(e.getMessage())); + } + } + + private static File getRequestedFile(String uri) throws FileNotFoundException, URISyntaxException { + String path = URI.create(uri).getPath(); + URL url = ClassLoader.getSystemResource("dataset/" + path); + + if (url == null) { + throw new FileNotFoundException(uri); + } + return new File(url.toURI()); + } + + private static FullHttpResponse getBadRequest(String message) { + return new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST, + Unpooled.copiedBuffer(message, CharsetUtil.UTF_8)); + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { + LOG.error(cause.getMessage(), cause); + if (context.channel().isOpen()) { + context.channel().close(); + } + } + + /** + * Sets the content type header for the HTTP Response + * @param response HTTP response + * @param file file to extract content type + */ + private static void setContentTypeHeader(HttpResponse response, File file) { + MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap(); + response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath())); + } +} diff --git a/tajo-tablespace-example/src/test/java/org/apache/tajo/storage/http/ExampleHttpServerInitializer.java b/tajo-tablespace-example/src/test/java/org/apache/tajo/storage/http/ExampleHttpServerInitializer.java new file mode 100644 index 0000000000..2cd6340ade --- /dev/null +++ b/tajo-tablespace-example/src/test/java/org/apache/tajo/storage/http/ExampleHttpServerInitializer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.http; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.stream.ChunkedWriteHandler; + +public class ExampleHttpServerInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline(); + + pipeline.addLast(new HttpServerCodec()); + pipeline.addLast(new HttpObjectAggregator(65536)); + pipeline.addLast(new ChunkedWriteHandler()); + pipeline.addLast(new ExampleHttpServerHandler()); + } +} diff --git a/tajo-tablespace-example/src/test/java/org/apache/tajo/storage/http/ExampleHttpTablespaceTestServer.java b/tajo-tablespace-example/src/test/java/org/apache/tajo/storage/http/ExampleHttpTablespaceTestServer.java new file mode 100644 index 0000000000..aa7acc85d8 --- /dev/null +++ b/tajo-tablespace-example/src/test/java/org/apache/tajo/storage/http/ExampleHttpTablespaceTestServer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.http; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.Closeable; +import java.net.InetSocketAddress; + +public class ExampleHttpTablespaceTestServer implements Closeable { + + private final static Log LOG = LogFactory.getLog(ExampleHttpTablespaceTestServer.class); + + private ServerBootstrap bootstrap; + private Channel channel; + + public void init() throws InterruptedException { + EventLoopGroup group = new NioEventLoopGroup(1); + + bootstrap = new ServerBootstrap(); + bootstrap.group(group) + .channel(NioServerSocketChannel.class) + .childHandler(new ExampleHttpServerInitializer()); + + channel = bootstrap.bind(0).sync().channel(); + + LOG.info(ExampleHttpTablespaceTestServer.class.getSimpleName() + " listening on port " + getAddress().getPort()); + } + + public InetSocketAddress getAddress() { + return (InetSocketAddress) channel.localAddress(); + } + + public void close() { + if (bootstrap != null) { + if (bootstrap.group() != null) { + bootstrap.group().shutdownGracefully(); + } + } + + if (channel != null) { + channel.close(); + } + } +} diff --git a/tajo-tablespace-example/src/test/java/org/apache/tajo/storage/http/TestExampleHttpFileQuery.java b/tajo-tablespace-example/src/test/java/org/apache/tajo/storage/http/TestExampleHttpFileQuery.java new file mode 100644 index 0000000000..89353f8aa0 --- /dev/null +++ b/tajo-tablespace-example/src/test/java/org/apache/tajo/storage/http/TestExampleHttpFileQuery.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.http; + +import net.minidev.json.JSONObject; +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.storage.TablespaceManager; +import org.junit.*; + +import java.net.InetAddress; +import java.net.URI; + +public class TestExampleHttpFileQuery extends QueryTestCaseBase { + + private static ExampleHttpTablespaceTestServer server; + + @BeforeClass + public static void setup() throws Exception { + server = new ExampleHttpTablespaceTestServer(); + server.init(); + + JSONObject configElements = new JSONObject(); + URI uri = URI.create("http://" + InetAddress.getLocalHost().getHostName() + ":" + server.getAddress().getPort()); + TablespaceManager.addTableSpaceForTest(new ExampleHttpFileTablespace("http_example", uri, configElements)); + + QueryTestCaseBase.testingCluster.getMaster().refresh(); + } + + @AfterClass + public static void teardown() throws Exception { + server.close(); + } + + @Before + public void prepareTables() throws TajoException { + executeString("create table got (*) tablespace http_example using ex_http_json with ('path'='got.json')"); + executeString("create table github (*) tablespace http_example using ex_http_json with ('path'='github.json')"); + } + + @After + public void dropTables() throws TajoException { + executeString("drop table got"); + executeString("drop table github"); + } + + @SimpleTest + @Test + public void testSelect() throws Exception { + runSimpleTests(); + } + + @SimpleTest + @Test + public void testGroupby() throws Exception { + runSimpleTests(); + } + + @SimpleTest + @Test + public void testSort() throws Exception { + runSimpleTests(); + } + + @SimpleTest + @Test + public void testJoin() throws Exception { + runSimpleTests(); + } +} diff --git a/tajo-tablespace-example/src/test/resources/dataset/github.json b/tajo-tablespace-example/src/test/resources/dataset/github.json new file mode 100644 index 0000000000..0b597b2ab4 --- /dev/null +++ b/tajo-tablespace-example/src/test/resources/dataset/github.json @@ -0,0 +1,4 @@ +{"id":"2937257753","type":"PushEvent","actor":{"id":5266949,"login":"hardrubic","gravatar_id":"","url":"https://api.github.com/users/hardrubic","avatar_url":"https://avatars.githubusercontent.com/u/5266949?"},"repo":{"id":38299397,"name":"hardrubic/rxJavaTest","url":"https://api.github.com/repos/hardrubic/rxJavaTest"},"payload":{"push_id":712081726,"size":1,"distinct_size":1,"ref":"refs/heads/master","head":"ea79d7a424f2693b70b9496726f315a5711b6fe7","before":"613f05557ad353f4bedc6df54128f8091ed1f1e9","commits":[{"sha":"ea79d7a424f2693b70b9496726f315a5711b6fe7","author":{"email":"dgzx106@163.com","name":"hardrubic"},"message":"增加rxJava例子","distinct":true,"url":"https://api.github.com/repos/hardrubic/rxJavaTest/commits/ea79d7a424f2693b70b9496726f315a5711b6fe7"}]},"public":true,"created_at":"2015-07-01T00:00:01Z"} +{"id":"2937257758","type":"WatchEvent","actor":{"id":11455393,"login":"chrischjh","gravatar_id":"","url":"https://api.github.com/users/chrischjh","avatar_url":"https://avatars.githubusercontent.com/u/11455393?"},"repo":{"id":18218031,"name":"dead-horse/co-and-koa-talk","url":"https://api.github.com/repos/dead-horse/co-and-koa-talk"},"payload":{"action":"started"},"public":true,"created_at":"2015-07-01T00:00:01Z"} +{"id":"2937257759","type":"CreateEvent","actor":{"id":206379,"login":"gvn","gravatar_id":"","url":"https://api.github.com/users/gvn","avatar_url":"https://avatars.githubusercontent.com/u/206379?"},"repo":{"id":24345476,"name":"gvn/webmaker-android","url":"https://api.github.com/repos/gvn/webmaker-android"},"payload":{"ref":"use-self-building","ref_type":"branch","master_branch":"master","description":"Webmaker for Firefox OS & Android","pusher_type":"user"},"public":true,"created_at":"2015-07-01T00:00:01Z"} +{"id":"2937257761","type":"ForkEvent","actor":{"id":1088854,"login":"CAOakleyII","gravatar_id":"","url":"https://api.github.com/users/CAOakleyII","avatar_url":"https://avatars.githubusercontent.com/u/1088854?"},"repo":{"id":11909954,"name":"skycocker/chromebrew","url":"https://api.github.com/repos/skycocker/chromebrew"},"payload":{"forkee":{"id":38339291,"name":"chromebrew","full_name":"CAOakleyII/chromebrew","owner":{"login":"CAOakleyII","id":1088854,"avatar_url":"https://avatars.githubusercontent.com/u/1088854?v=3","gravatar_id":"","url":"https://api.github.com/users/CAOakleyII","html_url":"https://github.com/CAOakleyII","followers_url":"https://api.github.com/users/CAOakleyII/followers","following_url":"https://api.github.com/users/CAOakleyII/following{/other_user}","gists_url":"https://api.github.com/users/CAOakleyII/gists{/gist_id}","starred_url":"https://api.github.com/users/CAOakleyII/starred{/owner}{/repo}","subscriptions_url":"https://api.github.com/users/CAOakleyII/subscriptions","organizations_url":"https://api.github.com/users/CAOakleyII/orgs","repos_url":"https://api.github.com/users/CAOakleyII/repos","events_url":"https://api.github.com/users/CAOakleyII/events{/privacy}","received_events_url":"https://api.github.com/users/CAOakleyII/received_events","type":"User","site_admin":false},"private":false,"html_url":"https://github.com/CAOakleyII/chromebrew","description":"Package manager for Chrome OS","fork":true,"url":"https://api.github.com/repos/CAOakleyII/chromebrew","forks_url":"https://api.github.com/repos/CAOakleyII/chromebrew/forks","keys_url":"https://api.github.com/repos/CAOakleyII/chromebrew/keys{/key_id}","collaborators_url":"https://api.github.com/repos/CAOakleyII/chromebrew/collaborators{/collaborator}","teams_url":"https://api.github.com/repos/CAOakleyII/chromebrew/teams","hooks_url":"https://api.github.com/repos/CAOakleyII/chromebrew/hooks","issue_events_url":"https://api.github.com/repos/CAOakleyII/chromebrew/issues/events{/number}","events_url":"https://api.github.com/repos/CAOakleyII/chromebrew/events","assignees_url":"https://api.github.com/repos/CAOakleyII/chromebrew/assignees{/user}","branches_url":"https://api.github.com/repos/CAOakleyII/chromebrew/branches{/branch}","tags_url":"https://api.github.com/repos/CAOakleyII/chromebrew/tags","blobs_url":"https://api.github.com/repos/CAOakleyII/chromebrew/git/blobs{/sha}","git_tags_url":"https://api.github.com/repos/CAOakleyII/chromebrew/git/tags{/sha}","git_refs_url":"https://api.github.com/repos/CAOakleyII/chromebrew/git/refs{/sha}","trees_url":"https://api.github.com/repos/CAOakleyII/chromebrew/git/trees{/sha}","statuses_url":"https://api.github.com/repos/CAOakleyII/chromebrew/statuses/{sha}","languages_url":"https://api.github.com/repos/CAOakleyII/chromebrew/languages","stargazers_url":"https://api.github.com/repos/CAOakleyII/chromebrew/stargazers","contributors_url":"https://api.github.com/repos/CAOakleyII/chromebrew/contributors","subscribers_url":"https://api.github.com/repos/CAOakleyII/chromebrew/subscribers","subscription_url":"https://api.github.com/repos/CAOakleyII/chromebrew/subscription","commits_url":"https://api.github.com/repos/CAOakleyII/chromebrew/commits{/sha}","git_commits_url":"https://api.github.com/repos/CAOakleyII/chromebrew/git/commits{/sha}","comments_url":"https://api.github.com/repos/CAOakleyII/chromebrew/comments{/number}","issue_comment_url":"https://api.github.com/repos/CAOakleyII/chromebrew/issues/comments{/number}","contents_url":"https://api.github.com/repos/CAOakleyII/chromebrew/contents/{+path}","compare_url":"https://api.github.com/repos/CAOakleyII/chromebrew/compare/{base}...{head}","merges_url":"https://api.github.com/repos/CAOakleyII/chromebrew/merges","archive_url":"https://api.github.com/repos/CAOakleyII/chromebrew/{archive_format}{/ref}","downloads_url":"https://api.github.com/repos/CAOakleyII/chromebrew/downloads","issues_url":"https://api.github.com/repos/CAOakleyII/chromebrew/issues{/number}","pulls_url":"https://api.github.com/repos/CAOakleyII/chromebrew/pulls{/number}","milestones_url":"https://api.github.com/repos/CAOakleyII/chromebrew/milestones{/number}","notifications_url":"https://api.github.com/repos/CAOakleyII/chromebrew/notifications{?since,all,participating}","labels_url":"https://api.github.com/repos/CAOakleyII/chromebrew/labels{/name}","releases_url":"https://api.github.com/repos/CAOakleyII/chromebrew/releases{/id}","created_at":"2015-07-01T00:00:00Z","updated_at":"2015-06-28T10:11:09Z","pushed_at":"2015-06-09T07:46:57Z","git_url":"git://github.com/CAOakleyII/chromebrew.git","ssh_url":"git@github.com:CAOakleyII/chromebrew.git","clone_url":"https://github.com/CAOakleyII/chromebrew.git","svn_url":"https://github.com/CAOakleyII/chromebrew","homepage":"http://skycocker.github.io/chromebrew/","size":846,"stargazers_count":0,"watchers_count":0,"language":null,"has_issues":false,"has_downloads":true,"has_wiki":true,"has_pages":false,"forks_count":0,"mirror_url":null,"open_issues_count":0,"forks":0,"open_issues":0,"watchers":0,"default_branch":"master","public":true}},"public":true,"created_at":"2015-07-01T00:00:01Z"} diff --git a/tajo-tablespace-example/src/test/resources/dataset/got.json b/tajo-tablespace-example/src/test/resources/dataset/got.json new file mode 100644 index 0000000000..db3ad6c7f0 --- /dev/null +++ b/tajo-tablespace-example/src/test/resources/dataset/got.json @@ -0,0 +1,3 @@ +{ "title" : "Hand of the King", "name" : { "first_name": "Eddard", "last_name": "Stark"}} +{ "title" : "Assassin", "name" : { "first_name": "Arya", "last_name": "Stark"}} +{ "title" : "Dancing Master", "name" : { "first_name": "Syrio", "last_name": "Forel"}} diff --git a/tajo-tablespace-example/src/test/resources/queries/TestExampleHttpFileQuery/testGroupby.sql b/tajo-tablespace-example/src/test/resources/queries/TestExampleHttpFileQuery/testGroupby.sql new file mode 100644 index 0000000000..ea68caa8a1 --- /dev/null +++ b/tajo-tablespace-example/src/test/resources/queries/TestExampleHttpFileQuery/testGroupby.sql @@ -0,0 +1 @@ +select actor.id, count(*) from github group by actor.id; \ No newline at end of file diff --git a/tajo-tablespace-example/src/test/resources/queries/TestExampleHttpFileQuery/testJoin.sql b/tajo-tablespace-example/src/test/resources/queries/TestExampleHttpFileQuery/testJoin.sql new file mode 100644 index 0000000000..e2c95c8240 --- /dev/null +++ b/tajo-tablespace-example/src/test/resources/queries/TestExampleHttpFileQuery/testJoin.sql @@ -0,0 +1 @@ +select g1.title from got g1, got g2 where g1.name.first_name = g2.name.first_name \ No newline at end of file diff --git a/tajo-tablespace-example/src/test/resources/queries/TestExampleHttpFileQuery/testSelect.sql b/tajo-tablespace-example/src/test/resources/queries/TestExampleHttpFileQuery/testSelect.sql new file mode 100644 index 0000000000..80ce27f736 --- /dev/null +++ b/tajo-tablespace-example/src/test/resources/queries/TestExampleHttpFileQuery/testSelect.sql @@ -0,0 +1 @@ +select title, name.first_name from got; \ No newline at end of file diff --git a/tajo-tablespace-example/src/test/resources/queries/TestExampleHttpFileQuery/testSort.sql b/tajo-tablespace-example/src/test/resources/queries/TestExampleHttpFileQuery/testSort.sql new file mode 100644 index 0000000000..d31ea941dd --- /dev/null +++ b/tajo-tablespace-example/src/test/resources/queries/TestExampleHttpFileQuery/testSort.sql @@ -0,0 +1 @@ +select actor.id, actor.login from github order by type; \ No newline at end of file diff --git a/tajo-tablespace-example/src/test/resources/results/TestExampleHttpFileQuery/testGroupby.result b/tajo-tablespace-example/src/test/resources/results/TestExampleHttpFileQuery/testGroupby.result new file mode 100644 index 0000000000..67797c83d6 --- /dev/null +++ b/tajo-tablespace-example/src/test/resources/results/TestExampleHttpFileQuery/testGroupby.result @@ -0,0 +1,6 @@ +actor/id,?count +------------------------------- +206379,1 +11455393,1 +1088854,1 +5266949,1 diff --git a/tajo-tablespace-example/src/test/resources/results/TestExampleHttpFileQuery/testJoin.result b/tajo-tablespace-example/src/test/resources/results/TestExampleHttpFileQuery/testJoin.result new file mode 100644 index 0000000000..a6da9fc1be --- /dev/null +++ b/tajo-tablespace-example/src/test/resources/results/TestExampleHttpFileQuery/testJoin.result @@ -0,0 +1,5 @@ +title +------------------------------- +Hand of the King +Assassin +Dancing Master diff --git a/tajo-tablespace-example/src/test/resources/results/TestExampleHttpFileQuery/testSelect.result b/tajo-tablespace-example/src/test/resources/results/TestExampleHttpFileQuery/testSelect.result new file mode 100644 index 0000000000..651a9feb56 --- /dev/null +++ b/tajo-tablespace-example/src/test/resources/results/TestExampleHttpFileQuery/testSelect.result @@ -0,0 +1,5 @@ +title,name/first_name +------------------------------- +Hand of the King,Eddard +Assassin,Arya +Dancing Master,Syrio diff --git a/tajo-tablespace-example/src/test/resources/results/TestExampleHttpFileQuery/testSort.result b/tajo-tablespace-example/src/test/resources/results/TestExampleHttpFileQuery/testSort.result new file mode 100644 index 0000000000..e0073b9320 --- /dev/null +++ b/tajo-tablespace-example/src/test/resources/results/TestExampleHttpFileQuery/testSort.result @@ -0,0 +1,6 @@ +actor/id,actor/login +------------------------------- +206379,gvn +1088854,CAOakleyII +5266949,hardrubic +11455393,chrischjh diff --git a/tajo-yarn/pom.xml b/tajo-yarn/pom.xml index 70511a187b..cb472761d9 100644 --- a/tajo-yarn/pom.xml +++ b/tajo-yarn/pom.xml @@ -59,7 +59,6 @@ maven-assembly-plugin - 2.4.1 jar-with-dependencies