Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ atlassian-ide-plugin.xml

# Patch files
*.patch

# Antlr files
*.tokens
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception
new ConstEval(new TextDatum("021")));
scanNode.setQual(evalNodeEq);
Tablespace tablespace = TablespaceManager.getByName("cluster1");
List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(1, fragments.size());
assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow()));
Expand All @@ -566,7 +566,7 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception
EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
scanNode.setQual(evalNodeA);

fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(2, fragments.size());
HBaseFragment fragment1 = (HBaseFragment) fragments.get(0);
assertEquals("020", new String(fragment1.getStartRow()));
Expand All @@ -581,7 +581,7 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception
new ConstEval(new TextDatum("075")));
EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
scanNode.setQual(evalNodeB);
fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(3, fragments.size());
fragment1 = (HBaseFragment) fragments.get(0);
assertEquals("020", new String(fragment1.getStartRow()));
Expand All @@ -604,7 +604,7 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception
EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
scanNode.setQual(evalNodeD);
fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(3, fragments.size());

fragment1 = (HBaseFragment) fragments.get(0);
Expand All @@ -627,7 +627,7 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception
evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
scanNode.setQual(evalNodeD);
fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
assertEquals(2, fragments.size());

fragment1 = (HBaseFragment) fragments.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,33 @@
import org.apache.hadoop.fs.Path;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos.CodecType;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.TajoProtos.CodecType;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.physical.PartitionMergeScanExec;
import org.apache.tajo.engine.planner.physical.ScanExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
import org.apache.tajo.io.AsyncTaskService;
import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.querymaster.Repartitioner;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.tuple.memory.MemoryBlock;
import org.apache.tajo.tuple.memory.MemoryRowBlock;
import org.apache.tajo.util.CompressionUtil;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.SplitUtil;
import org.apache.tajo.worker.TaskAttemptContext;

import java.io.IOException;
Expand Down Expand Up @@ -102,13 +104,8 @@ public void init() throws IOException, TajoException {
private void initSeqScanExec() throws IOException, TajoException {
Tablespace tablespace = TablespaceManager.get(tableDesc.getUri());

List<Fragment> fragments = Lists.newArrayList();
if (tableDesc.hasPartition()) {
FileTablespace fileTablespace = TUtil.checkTypeAndGet(tablespace, FileTablespace.class);
fragments.addAll(Repartitioner.getFragmentsFromPartitionedTable(fileTablespace, scanNode, tableDesc));
} else {
fragments.addAll(tablespace.getSplits(tableDesc.getName(), tableDesc, scanNode.getQual()));
}
List<Fragment> fragments = Lists.newArrayList(
SplitUtil.getSplits(tablespace, scanNode, scanNode.getTableDesc(), true));

if (!fragments.isEmpty()) {
FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[fragments.size()]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.tajo.querymaster;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -54,11 +53,15 @@
import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
import org.apache.tajo.querymaster.Task.PullHost;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.SplitUtil;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.worker.FetchImpl;
Expand Down Expand Up @@ -120,8 +123,9 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC
// if table has no data, tablespace will return empty FileFragment.
// So, we need to handle FileFragment by its size.
// If we don't check its size, it can cause IndexOutOfBoundsException.
Tablespace space = TablespaceManager.get(tableDesc.getUri());
List<Fragment> fileFragments = space.getSplits(scans[i].getCanonicalName(), tableDesc, null);
List<Fragment> fileFragments = SplitUtil.getSplits(
TablespaceManager.get(tableDesc.getUri()), scans[i], tableDesc, false);

if (fileFragments.size() > 0) {
fragments[i] = fileFragments.get(0);
} else {
Expand Down Expand Up @@ -385,27 +389,16 @@ private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMaster
//If there are more than one data files, that files should be added to fragments or partition path

for (ScanNode eachScan: broadcastScans) {
// TODO: This is a workaround to broadcast partitioned tables, and should be improved to be consistent with
// plain tables.
if (eachScan.getType() != NodeType.PARTITIONS_SCAN) {
TableDesc tableDesc = masterContext.getTableDesc(eachScan);

Path[] partitionScanPaths = null;
TableDesc tableDesc = masterContext.getTableDesc(eachScan);
Tablespace space = TablespaceManager.get(tableDesc.getUri());

if (eachScan.getType() == NodeType.PARTITIONS_SCAN) {

PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan;
partitionScanPaths = partitionScan.getInputPaths();
// set null to inputPaths in getFragmentsFromPartitionedTable()
getFragmentsFromPartitionedTable((FileTablespace) space, eachScan, tableDesc);
partitionScan.setInputPaths(partitionScanPaths);

} else {

Collection<Fragment> scanFragments =
space.getSplits(eachScan.getCanonicalName(), tableDesc, eachScan.getQual());
Collection<Fragment> scanFragments = SplitUtil.getSplits(
TablespaceManager.get(tableDesc.getUri()), eachScan, tableDesc, false);
if (scanFragments != null) {
rightFragments.addAll(scanFragments);
}

}
}
}
Expand Down Expand Up @@ -468,24 +461,6 @@ public static Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> merge
return mergedHashEntries;
}

/**
* It creates a number of fragments for all partitions.
*/
public static List<Fragment> getFragmentsFromPartitionedTable(Tablespace tsHandler,
ScanNode scan,
TableDesc table) throws IOException {
Preconditions.checkArgument(tsHandler instanceof FileTablespace, "tsHandler must be FileTablespace");
if (!(scan instanceof PartitionedTableScanNode)) {
throw new IllegalArgumentException("scan should be a PartitionedTableScanNode type.");
}
List<Fragment> fragments = Lists.newArrayList();
PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
fragments.addAll(((FileTablespace) tsHandler).getSplits(
scan.getCanonicalName(), table.getMeta(), table.getSchema(), partitionsScan.getInputPaths()));
partitionsScan.setInputPaths(null);
return fragments;
}

private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, Stage stage,
int baseScanId, Fragment[] fragments)
throws IOException, TajoException {
Expand Down Expand Up @@ -513,30 +488,15 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch
ScanNode scan = scans[i];
TableDesc desc = stage.getContext().getTableDesc(scan);

Collection<Fragment> scanFragments;
Path[] partitionScanPaths = null;


Tablespace space = TablespaceManager.get(desc.getUri());

if (scan.getType() == NodeType.PARTITIONS_SCAN) {
PartitionedTableScanNode partitionScan = (PartitionedTableScanNode) scan;
partitionScanPaths = partitionScan.getInputPaths();
// set null to inputPaths in getFragmentsFromPartitionedTable()
scanFragments = getFragmentsFromPartitionedTable(space, scan, desc);
} else {
scanFragments = space.getSplits(scan.getCanonicalName(), desc, scan.getQual());
}
Collection<Fragment> scanFragments = SplitUtil.getSplits(TablespaceManager.get(desc.getUri()), scan, desc, false);

if (scanFragments != null) {
if (i == baseScanId) {
baseFragments = scanFragments;
} else {
if (scan.getType() == NodeType.PARTITIONS_SCAN) {
PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan;
// PhisicalPlanner make PartitionMergeScanExec when table is boradcast table and inputpaths is not empty
partitionScan.setInputPaths(partitionScanPaths);
} else {
// TODO: This is a workaround to broadcast partitioned tables, and should be improved to be consistent with
// plain tables.
if (scan.getType() != NodeType.PARTITIONS_SCAN) {
broadcastFragments.addAll(scanFragments);
}
}
Expand Down
37 changes: 7 additions & 30 deletions tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.util.PlannerUtil;
Expand All @@ -61,13 +60,12 @@
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.FileTablespace;
import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.util.SplitUtil;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.history.StageHistory;
import org.apache.tajo.util.history.TaskHistory;
Expand Down Expand Up @@ -1185,34 +1183,13 @@ private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOExceptio
ScanNode scan = scans[0];
TableDesc table = stage.context.getTableDesc(scan);

Collection<Fragment> fragments;
Tablespace tablespace = TablespaceManager.get(scan.getTableDesc().getUri());

// Depending on scanner node's type, it creates fragments. If scan is for
// a partitioned table, It will creates lots fragments for all partitions.
// Otherwise, it creates at least one fragments for a table, which may
// span a number of blocks or possibly consists of a number of files.
//
// Also, we can ensure FileTableSpace if the type of ScanNode is PARTITIONS_SCAN.
if (scan.getType() == NodeType.PARTITIONS_SCAN) {
// After calling this method, partition paths are removed from the physical plan.
fragments = Repartitioner.getFragmentsFromPartitionedTable((FileTablespace) tablespace, scan, table);
} else {
fragments = tablespace.getSplits(scan.getCanonicalName(), table, scan.getQual());
}

Collection<Fragment> fragments = SplitUtil.getSplits(
TablespaceManager.get(scan.getTableDesc().getUri()), scan, table, false);
SplitUtil.preparePartitionScanPlanForSchedule(scan);
Stage.scheduleFragments(stage, fragments);
if (stage.getTaskScheduler() instanceof DefaultTaskScheduler) {
//Leaf task of DefaultTaskScheduler should be fragment size
// EstimatedTaskNum determined number of initial container
stage.schedulerContext.setEstimatedTaskNum(fragments.size());
} else {
TajoConf conf = stage.context.getConf();
stage.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024);
int estimatedTaskNum = (int) Math.ceil((double) table.getStats().getNumBytes() /
(double) stage.schedulerContext.getTaskSize());
stage.schedulerContext.setEstimatedTaskNum(estimatedTaskNum);
}

// The number of leaf tasks should be the number of fragments.
stage.schedulerContext.setEstimatedTaskNum(fragments.size());
}
}

Expand Down
Loading