diff --git a/.gitignore b/.gitignore index 5bca77cf2c..276de0d2e6 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ atlassian-ide-plugin.xml # Patch files *.patch + +# Antlr files +*.tokens diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java index 69a02305d9..8914e3b035 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java @@ -553,7 +553,7 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception new ConstEval(new TextDatum("021"))); scanNode.setQual(evalNodeEq); Tablespace tablespace = TablespaceManager.getByName("cluster1"); - List fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual()); + List fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual()); assertEquals(1, fragments.size()); assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow())); assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow())); @@ -566,7 +566,7 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2); scanNode.setQual(evalNodeA); - fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual()); + fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual()); assertEquals(2, fragments.size()); HBaseFragment fragment1 = (HBaseFragment) fragments.get(0); assertEquals("020", new String(fragment1.getStartRow())); @@ -581,7 +581,7 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception new ConstEval(new TextDatum("075"))); EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3); scanNode.setQual(evalNodeB); - fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual()); + fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual()); assertEquals(3, fragments.size()); fragment1 = (HBaseFragment) fragments.get(0); assertEquals("020", new String(fragment1.getStartRow())); @@ -604,7 +604,7 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC); scanNode.setQual(evalNodeD); - fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual()); + fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual()); assertEquals(3, fragments.size()); fragment1 = (HBaseFragment) fragments.get(0); @@ -627,7 +627,7 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC); scanNode.setQual(evalNodeD); - fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual()); + fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual()); assertEquals(2, fragments.size()); fragment1 = (HBaseFragment) fragments.get(0); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index a1728ec2ca..871db89b52 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -26,31 +26,33 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; +import org.apache.tajo.TajoProtos.CodecType; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import org.apache.tajo.TajoProtos.CodecType; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.physical.PartitionMergeScanExec; import org.apache.tajo.engine.planner.physical.ScanExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TajoInternalError; -import org.apache.tajo.ipc.ClientProtos.SerializedResultSet; import org.apache.tajo.io.AsyncTaskService; +import org.apache.tajo.ipc.ClientProtos.SerializedResultSet; import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.querymaster.Repartitioner; -import org.apache.tajo.storage.*; +import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; +import org.apache.tajo.storage.Tablespace; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.tuple.memory.MemoryBlock; import org.apache.tajo.tuple.memory.MemoryRowBlock; import org.apache.tajo.util.CompressionUtil; -import org.apache.tajo.util.TUtil; +import org.apache.tajo.util.SplitUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -102,13 +104,8 @@ public void init() throws IOException, TajoException { private void initSeqScanExec() throws IOException, TajoException { Tablespace tablespace = TablespaceManager.get(tableDesc.getUri()); - List fragments = Lists.newArrayList(); - if (tableDesc.hasPartition()) { - FileTablespace fileTablespace = TUtil.checkTypeAndGet(tablespace, FileTablespace.class); - fragments.addAll(Repartitioner.getFragmentsFromPartitionedTable(fileTablespace, scanNode, tableDesc)); - } else { - fragments.addAll(tablespace.getSplits(tableDesc.getName(), tableDesc, scanNode.getQual())); - } + List fragments = Lists.newArrayList( + SplitUtil.getSplits(tablespace, scanNode, scanNode.getTableDesc(), true)); if (!fragments.isEmpty()) { FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[fragments.size()])); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index ba051a3115..c264e3e75f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -19,7 +19,6 @@ package org.apache.tajo.querymaster; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,11 +53,15 @@ import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder; import org.apache.tajo.querymaster.Task.IntermediateEntry; import org.apache.tajo.querymaster.Task.PullHost; -import org.apache.tajo.storage.*; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.Tablespace; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.storage.TupleRange; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.Pair; +import org.apache.tajo.util.SplitUtil; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.worker.FetchImpl; @@ -120,8 +123,9 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC // if table has no data, tablespace will return empty FileFragment. // So, we need to handle FileFragment by its size. // If we don't check its size, it can cause IndexOutOfBoundsException. - Tablespace space = TablespaceManager.get(tableDesc.getUri()); - List fileFragments = space.getSplits(scans[i].getCanonicalName(), tableDesc, null); + List fileFragments = SplitUtil.getSplits( + TablespaceManager.get(tableDesc.getUri()), scans[i], tableDesc, false); + if (fileFragments.size() > 0) { fragments[i] = fileFragments.get(0); } else { @@ -385,27 +389,16 @@ private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMaster //If there are more than one data files, that files should be added to fragments or partition path for (ScanNode eachScan: broadcastScans) { + // TODO: This is a workaround to broadcast partitioned tables, and should be improved to be consistent with + // plain tables. + if (eachScan.getType() != NodeType.PARTITIONS_SCAN) { + TableDesc tableDesc = masterContext.getTableDesc(eachScan); - Path[] partitionScanPaths = null; - TableDesc tableDesc = masterContext.getTableDesc(eachScan); - Tablespace space = TablespaceManager.get(tableDesc.getUri()); - - if (eachScan.getType() == NodeType.PARTITIONS_SCAN) { - - PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan; - partitionScanPaths = partitionScan.getInputPaths(); - // set null to inputPaths in getFragmentsFromPartitionedTable() - getFragmentsFromPartitionedTable((FileTablespace) space, eachScan, tableDesc); - partitionScan.setInputPaths(partitionScanPaths); - - } else { - - Collection scanFragments = - space.getSplits(eachScan.getCanonicalName(), tableDesc, eachScan.getQual()); + Collection scanFragments = SplitUtil.getSplits( + TablespaceManager.get(tableDesc.getUri()), eachScan, tableDesc, false); if (scanFragments != null) { rightFragments.addAll(scanFragments); } - } } } @@ -468,24 +461,6 @@ public static Map>> merge return mergedHashEntries; } - /** - * It creates a number of fragments for all partitions. - */ - public static List getFragmentsFromPartitionedTable(Tablespace tsHandler, - ScanNode scan, - TableDesc table) throws IOException { - Preconditions.checkArgument(tsHandler instanceof FileTablespace, "tsHandler must be FileTablespace"); - if (!(scan instanceof PartitionedTableScanNode)) { - throw new IllegalArgumentException("scan should be a PartitionedTableScanNode type."); - } - List fragments = Lists.newArrayList(); - PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan; - fragments.addAll(((FileTablespace) tsHandler).getSplits( - scan.getCanonicalName(), table.getMeta(), table.getSchema(), partitionsScan.getInputPaths())); - partitionsScan.setInputPaths(null); - return fragments; - } - private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, Stage stage, int baseScanId, Fragment[] fragments) throws IOException, TajoException { @@ -513,30 +488,15 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch ScanNode scan = scans[i]; TableDesc desc = stage.getContext().getTableDesc(scan); - Collection scanFragments; - Path[] partitionScanPaths = null; - - - Tablespace space = TablespaceManager.get(desc.getUri()); - - if (scan.getType() == NodeType.PARTITIONS_SCAN) { - PartitionedTableScanNode partitionScan = (PartitionedTableScanNode) scan; - partitionScanPaths = partitionScan.getInputPaths(); - // set null to inputPaths in getFragmentsFromPartitionedTable() - scanFragments = getFragmentsFromPartitionedTable(space, scan, desc); - } else { - scanFragments = space.getSplits(scan.getCanonicalName(), desc, scan.getQual()); - } + Collection scanFragments = SplitUtil.getSplits(TablespaceManager.get(desc.getUri()), scan, desc, false); if (scanFragments != null) { if (i == baseScanId) { baseFragments = scanFragments; } else { - if (scan.getType() == NodeType.PARTITIONS_SCAN) { - PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan; - // PhisicalPlanner make PartitionMergeScanExec when table is boradcast table and inputpaths is not empty - partitionScan.setInputPaths(partitionScanPaths); - } else { + // TODO: This is a workaround to broadcast partitioned tables, and should be improved to be consistent with + // plain tables. + if (scan.getType() != NodeType.PARTITIONS_SCAN) { broadcastFragments.addAll(scanFragments); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index fed75cd7fe..c055d119e4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -52,7 +52,6 @@ import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; import org.apache.tajo.plan.logical.*; -import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.plan.util.PlannerUtil; @@ -61,13 +60,12 @@ import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.apache.tajo.storage.FileTablespace; -import org.apache.tajo.storage.Tablespace; import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.RpcParameterFactory; +import org.apache.tajo.util.SplitUtil; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.history.StageHistory; import org.apache.tajo.util.history.TaskHistory; @@ -1185,34 +1183,13 @@ private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOExceptio ScanNode scan = scans[0]; TableDesc table = stage.context.getTableDesc(scan); - Collection fragments; - Tablespace tablespace = TablespaceManager.get(scan.getTableDesc().getUri()); - - // Depending on scanner node's type, it creates fragments. If scan is for - // a partitioned table, It will creates lots fragments for all partitions. - // Otherwise, it creates at least one fragments for a table, which may - // span a number of blocks or possibly consists of a number of files. - // - // Also, we can ensure FileTableSpace if the type of ScanNode is PARTITIONS_SCAN. - if (scan.getType() == NodeType.PARTITIONS_SCAN) { - // After calling this method, partition paths are removed from the physical plan. - fragments = Repartitioner.getFragmentsFromPartitionedTable((FileTablespace) tablespace, scan, table); - } else { - fragments = tablespace.getSplits(scan.getCanonicalName(), table, scan.getQual()); - } - + Collection fragments = SplitUtil.getSplits( + TablespaceManager.get(scan.getTableDesc().getUri()), scan, table, false); + SplitUtil.preparePartitionScanPlanForSchedule(scan); Stage.scheduleFragments(stage, fragments); - if (stage.getTaskScheduler() instanceof DefaultTaskScheduler) { - //Leaf task of DefaultTaskScheduler should be fragment size - // EstimatedTaskNum determined number of initial container - stage.schedulerContext.setEstimatedTaskNum(fragments.size()); - } else { - TajoConf conf = stage.context.getConf(); - stage.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024); - int estimatedTaskNum = (int) Math.ceil((double) table.getStats().getNumBytes() / - (double) stage.schedulerContext.getTaskSize()); - stage.schedulerContext.setEstimatedTaskNum(estimatedTaskNum); - } + + // The number of leaf tasks should be the number of fragments. + stage.schedulerContext.setEstimatedTaskNum(fragments.size()); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java new file mode 100644 index 0000000000..5f66b07e85 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/util/SplitUtil.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.logical.PartitionedTableScanNode; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.querymaster.Stage; +import org.apache.tajo.storage.FileTablespace; +import org.apache.tajo.storage.Tablespace; +import org.apache.tajo.storage.fragment.Fragment; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +public class SplitUtil { + + /** + * This method creates fragments depending on the table type. If the table is + * a partitioned table, it will creates multiple fragments for all partitions. + * Otherwise, it creates at least one fragments for a table, which may + * span a number of blocks or possibly consists of a number of files. + * + * Also, we can ensure FileTableSpace if the type of table is a partitioned table. + * + * @param tablespace tablespace handler + * @param scan scan node + * @param tableDesc table desc of scan node + * @param requireSort if set, the result fragments will be sorted with their paths. + * Only set when a query type is the simple query. + * @return a list of fragments for input table + * @throws IOException + * @throws TajoException + */ + public static List getSplits(Tablespace tablespace, + ScanNode scan, + TableDesc tableDesc, + boolean requireSort) + throws IOException, TajoException { + List fragments; + if (tableDesc.hasPartition()) { + // TODO: Partition tables should also be handled by tablespace. + fragments = SplitUtil.getFragmentsFromPartitionedTable(tablespace, scan, tableDesc, requireSort); + } else { + fragments = tablespace.getSplits(scan.getCanonicalName(), tableDesc, requireSort, scan.getQual()); + } + + return fragments; + } + + /** + * It creates a number of fragments for all partitions. + */ + private static List getFragmentsFromPartitionedTable(Tablespace tsHandler, + ScanNode scan, + TableDesc table, + boolean requireSort) throws IOException { + Preconditions.checkArgument(tsHandler instanceof FileTablespace, "tsHandler must be FileTablespace"); + if (!(scan instanceof PartitionedTableScanNode)) { + throw new IllegalArgumentException("scan should be a PartitionedTableScanNode type."); + } + List fragments = Lists.newArrayList(); + PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan; + fragments.addAll(((FileTablespace) tsHandler).getSplits( + scan.getCanonicalName(), table.getMeta(), table.getSchema(), requireSort, partitionsScan.getInputPaths())); + return fragments; + } + + /** + * Clear input paths of {@link PartitionedTableScanNode}. + * This is to avoid unnecessary transmission of a lot of partition table paths to workers. + * So, this method should be invoked before {@link org.apache.tajo.querymaster.Stage#scheduleFragment(Stage, Fragment)} + * unless the scan is broadcasted. + * + * @param scanNode scan node + */ + public static void preparePartitionScanPlanForSchedule(ScanNode scanNode) { + if (scanNode.getType() == NodeType.PARTITIONS_SCAN) { + // TODO: The partition input paths don't have to be kept in a logical node at all. + // This should be improved by implementing a specialized fragment for partition tables. + PartitionedTableScanNode partitionScan = (PartitionedTableScanNode) scanNode; + partitionScan.clearInputPaths(); + } + } +} diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java index 617688228f..d3f714837f 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java @@ -29,9 +29,10 @@ import org.apache.tajo.util.TUtil; import java.util.ArrayList; +import java.util.Arrays; public class PartitionedTableScanNode extends ScanNode { - @Expose Path [] inputPaths; + @Expose private Path [] inputPaths; public PartitionedTableScanNode(int pid) { super(pid, NodeType.PARTITIONS_SCAN); @@ -43,15 +44,22 @@ public void init(ScanNode scanNode, Path[] inputPaths) { setOutSchema(scanNode.getOutSchema()); this.qual = scanNode.qual; this.targets = scanNode.targets; - this.inputPaths = inputPaths; + setInputPaths(inputPaths); if (scanNode.hasAlias()) { alias = scanNode.alias; } } + public void clearInputPaths() { + this.inputPaths = null; + } + public void setInputPaths(Path [] paths) { this.inputPaths = paths; + if (this.inputPaths != null) { + Arrays.sort(inputPaths); + } } public Path [] getInputPaths() { diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index 00e6d75a12..4afb383c29 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -124,14 +124,17 @@ public URI getRootUri() { /** * Returns the splits that will serve as input for the scan tasks. The * number of splits matches the number of regions in a table. + * * @param inputSourceId Input source identifier, which can be either relation name or execution block id * @param tableDesc The table description for the target data. + * @param requireSort The result fragments will be sorted with their paths. * @param filterCondition filter condition which can prune splits if possible * @return The list of input fragments. * @throws java.io.IOException */ public abstract List getSplits(String inputSourceId, TableDesc tableDesc, + boolean requireSort, @Nullable EvalNode filterCondition) throws IOException, TajoException; /** diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index 132ceff0ae..0cf883eb4f 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -557,6 +557,7 @@ private Collection convertRangeToFragment( @Override public List getSplits(String inputSourceId, TableDesc table, + boolean requireSorted, @Nullable EvalNode filterCondition) throws IOException, TajoException { return (List) (List) getRawSplits(inputSourceId, table, filterCondition); } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 2aa2b91025..f3cb9a5144 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -304,10 +304,12 @@ public boolean accept(Path path) { * Subclasses may override to, e.g., select only files matching a regular * expression. * + * @param requireSort if set, result will be sorted by their paths. + * @param dirs input dirs * @return array of FileStatus objects * @throws IOException if zero items. */ - protected List listStatus(Path... dirs) throws IOException { + protected List listStatus(boolean requireSort, Path... dirs) throws IOException { List result = new ArrayList<>(); if (dirs.length == 0) { throw new IOException("No input paths specified in job"); @@ -342,6 +344,10 @@ protected List listStatus(Path... dirs) throws IOException { if (!errors.isEmpty()) { throw new InvalidInputException(errors); } + + if (requireSort) { + Collections.sort(result); + } LOG.info("Total input paths to process : " + result.size()); return result; } @@ -464,7 +470,7 @@ private int[] getDiskIds(VolumeId[] volumeIds) { * * @throws IOException */ - public List getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs) + public List getSplits(String tableName, TableMeta meta, Schema schema, boolean requireSort, Path... inputs) throws IOException { // generate splits' @@ -477,7 +483,7 @@ public List getSplits(String tableName, TableMeta meta, Schema schema, if (fs.isFile(p)) { files.addAll(Lists.newArrayList(fs.getFileStatus(p))); } else { - files.addAll(listStatus(p)); + files.addAll(listStatus(requireSort, p)); } int previousSplitSize = splits.size(); @@ -606,8 +612,9 @@ public String getMessage(){ @Override public List getSplits(String inputSourceId, TableDesc table, + boolean requireSort, @Nullable EvalNode filterCondition) throws IOException { - return getSplits(inputSourceId, table.getMeta(), table.getSchema(), new Path(table.getUri())); + return getSplits(inputSourceId, table.getMeta(), table.getSchema(), requireSort, new Path(table.getUri())); } @Override diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java index c62a01cf26..18183300fa 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java @@ -127,7 +127,7 @@ public void testBlockSplit() throws IOException { appender.close(); FileStatus fileStatus = fs.getFileStatus(path); - List splits = sm.getSplits("table", meta, schema, path); + List splits = sm.getSplits("table", meta, schema, false, path); int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize()); assertEquals(splitSize, splits.size()); diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java index dc8781ef38..ad45d18b93 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java @@ -156,13 +156,13 @@ public void testGetSplit() throws Exception { List splits = Lists.newArrayList(); // Get FileFragments in partition batch - splits.addAll(space.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()]))); + splits.addAll(space.getSplits("data", meta, schema, false, partitions.toArray(new Path[partitions.size()]))); assertEquals(testCount, splits.size()); // -1 is unknown volumeId assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); splits.clear(); - splits.addAll(space.getSplits("data", meta, schema, + splits.addAll(space.getSplits("data", meta, schema, false, partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2]))); assertEquals(testCount / 2, splits.size()); assertEquals(1, splits.get(0).getHosts().length); @@ -212,7 +212,7 @@ public void testZeroLengthSplit() throws Exception { List splits = Lists.newArrayList(); // Get FileFragments in partition batch - splits.addAll(space.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()]))); + splits.addAll(space.getSplits("data", meta, schema, false, partitions.toArray(new Path[partitions.size()]))); assertEquals(0, splits.size()); fs.close(); @@ -256,7 +256,7 @@ public void testGetSplitWithBlockStorageLocationsBatching() throws Exception { TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.TEXT, conf); List splits = Lists.newArrayList(); - splits.addAll(sm.getSplits("data", meta, schema, tablePath)); + splits.addAll(sm.getSplits("data", meta, schema, false, tablePath)); assertEquals(testCount, splits.size()); assertEquals(2, splits.get(0).getHosts().length); diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index 3e363406be..c1423d7899 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -269,7 +269,7 @@ public void testZeroRows() throws IOException { assertEquals(0, fileStatus.getLen()); } - List splits = sm.getSplits("testZeroRows", meta, schema, testDir); + List splits = sm.getSplits("testZeroRows", meta, schema, false, testDir); int tupleCnt = 0; for (Fragment fragment : splits) { Scanner scanner = sm.getScanner(meta, schema, fragment, schema); diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java index fa6cf486e2..7a630b2db7 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java @@ -122,6 +122,7 @@ public URI getTableUri(String databaseName, String tableName) { @Override public List getSplits(String inputSourceId, TableDesc tableDesc, + boolean requireSorted, @Nullable EvalNode filterCondition) throws IOException { return Lists.newArrayList((Fragment)new JdbcFragment(inputSourceId, tableDesc.getUri().toASCIIString())); } diff --git a/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLJdbcTableSpace.java b/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLJdbcTableSpace.java index 0d0e15ec26..875d70ee5d 100644 --- a/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLJdbcTableSpace.java +++ b/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/TestPgSQLJdbcTableSpace.java @@ -74,7 +74,7 @@ public void testGetSplits() throws IOException, TajoException { Tablespace space = TablespaceManager.getByName("pgsql_cluster"); MetadataProvider provider = space.getMetadataProvider(); TableDesc table = provider.getTableDesc(null, "lineitem"); - List fragments = space.getSplits("lineitem", table, null); + List fragments = space.getSplits("lineitem", table, false, null); assertNotNull(fragments); assertEquals(1, fragments.size()); }