Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Release 0.12.0 - unreleased

NEW FEATURES

TAJO-1686: Allow Tajo to use Hive UDF. (jihoon)
TAJO-1686: Allow Tajo to use Hive UDF. (Jongyoung Park via jihoon)

TAJO-2122: PullServer as an Auxiliary service of Yarn. (jihoon)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ message SchemaProto {

message FragmentProto {
required string id = 1;
required string data_format = 2;
required string kind = 2;
required bytes contents = 3;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@

package org.apache.tajo.datum;

import org.junit.Test;
import org.apache.tajo.common.TajoDataTypes;
import org.junit.Test;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

public class TestInt4Datum {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,8 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception
Tablespace tablespace = TablespaceManager.getByName("cluster1");
List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
assertEquals(1, fragments.size());
assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow()));
assertEquals("021", ((HBaseFragment)fragments.get(0)).getStartKey().toString());
assertEquals("021" + postFix, ((HBaseFragment)fragments.get(0)).getEndKey().toString());

// where rk >= '020' and rk <= '055'
EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
Expand All @@ -569,12 +569,12 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception
fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
assertEquals(2, fragments.size());
HBaseFragment fragment1 = (HBaseFragment) fragments.get(0);
assertEquals("020", new String(fragment1.getStartRow()));
assertEquals("040", new String(fragment1.getStopRow()));
assertEquals("020", fragment1.getStartKey().toString());
assertEquals("040", fragment1.getEndKey().toString());

HBaseFragment fragment2 = (HBaseFragment) fragments.get(1);
assertEquals("040", new String(fragment2.getStartRow()));
assertEquals("055" + postFix, new String(fragment2.getStopRow()));
assertEquals("040", fragment2.getStartKey().toString());
assertEquals("055" + postFix, fragment2.getEndKey().toString());

// where (rk >= '020' and rk <= '055') or rk = '075'
EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
Expand All @@ -584,16 +584,16 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception
fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual());
assertEquals(3, fragments.size());
fragment1 = (HBaseFragment) fragments.get(0);
assertEquals("020", new String(fragment1.getStartRow()));
assertEquals("040", new String(fragment1.getStopRow()));
assertEquals("020", fragment1.getStartKey().toString());
assertEquals("040", fragment1.getEndKey().toString());

fragment2 = (HBaseFragment) fragments.get(1);
assertEquals("040", new String(fragment2.getStartRow()));
assertEquals("055" + postFix, new String(fragment2.getStopRow()));
assertEquals("040", fragment2.getStartKey().toString());
assertEquals("055" + postFix, fragment2.getEndKey().toString());

HBaseFragment fragment3 = (HBaseFragment) fragments.get(2);
assertEquals("075", new String(fragment3.getStartRow()));
assertEquals("075" + postFix, new String(fragment3.getStopRow()));
assertEquals("075", fragment3.getStartKey().toString());
assertEquals("075" + postFix, fragment3.getEndKey().toString());


// where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078')
Expand All @@ -608,16 +608,16 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception
assertEquals(3, fragments.size());

fragment1 = (HBaseFragment) fragments.get(0);
assertEquals("020", new String(fragment1.getStartRow()));
assertEquals("040", new String(fragment1.getStopRow()));
assertEquals("020", fragment1.getStartKey().toString());
assertEquals("040", fragment1.getEndKey().toString());

fragment2 = (HBaseFragment) fragments.get(1);
assertEquals("040", new String(fragment2.getStartRow()));
assertEquals("055" + postFix, new String(fragment2.getStopRow()));
assertEquals("040", fragment2.getStartKey().toString());
assertEquals("055" + postFix, fragment2.getEndKey().toString());

fragment3 = (HBaseFragment) fragments.get(2);
assertEquals("072", new String(fragment3.getStartRow()));
assertEquals("078" + postFix, new String(fragment3.getStopRow()));
assertEquals("072", fragment3.getStartKey().toString());
assertEquals("078" + postFix, fragment3.getEndKey().toString());

// where (rk >= '020' and rk <= '055') or (rk >= '057' and rk <= '059')
evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
Expand All @@ -631,12 +631,12 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception
assertEquals(2, fragments.size());

fragment1 = (HBaseFragment) fragments.get(0);
assertEquals("020", new String(fragment1.getStartRow()));
assertEquals("040", new String(fragment1.getStopRow()));
assertEquals("020", fragment1.getStartKey().toString());
assertEquals("040", fragment1.getEndKey().toString());

fragment2 = (HBaseFragment) fragments.get(1);
assertEquals("040", new String(fragment2.getStartRow()));
assertEquals("059" + postFix, new String(fragment2.getStopRow()));
assertEquals("040", fragment2.getStartKey().toString());
assertEquals("059" + postFix, fragment2.getEndKey().toString());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.google.common.collect.Sets;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.fragment.BuiltinFragmentKinds;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.CommonTestingUtil;
Expand All @@ -34,18 +36,20 @@
import static org.junit.Assert.assertTrue;

public class TestFileFragment {
private TajoConf conf;
private Path path;

@Before
public final void setUp() throws Exception {
conf = new TajoConf();
path = CommonTestingUtil.getTestDir();
}

@Test
public final void testGetAndSetFields() {
FileFragment fragment1 = new FileFragment("table1_1", new Path(path, "table0"), 0, 500);

assertEquals("table1_1", fragment1.getTableName());
assertEquals("table1_1", fragment1.getInputSourceId());
assertEquals(new Path(path, "table0"), fragment1.getPath());
assertTrue(0 == fragment1.getStartKey());
assertTrue(500 == fragment1.getLength());
Expand All @@ -55,8 +59,9 @@ public final void testGetAndSetFields() {
public final void testGetProtoAndRestore() {
FileFragment fragment = new FileFragment("table1_1", new Path(path, "table0"), 0, 500);

FileFragment fragment1 = FragmentConvertor.convert(FileFragment.class, fragment.getProto());
assertEquals("table1_1", fragment1.getTableName());
FileFragment fragment1 = FragmentConvertor.convert(conf, BuiltinFragmentKinds.FILE,
FragmentConvertor.toFragmentProto(conf, fragment));
assertEquals("table1_1", fragment1.getInputSourceId());
assertEquals(new Path(path, "table0"), fragment1.getPath());
assertTrue(0 == fragment1.getStartKey());
assertTrue(500 == fragment1.getLength());
Expand All @@ -73,7 +78,7 @@ public final void testCompareTo() {
Arrays.sort(tablets);

for(int i = 0; i < num; i++) {
assertEquals("tablet1_"+i, tablets[i].getTableName());
assertEquals("tablet1_"+i, tablets[i].getInputSourceId());
}
}

Expand Down
140 changes: 0 additions & 140 deletions tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -923,13 +923,13 @@ public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, St
PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
List<Fragment> fileFragments = new ArrayList<>();

FileTablespace space = (FileTablespace) TablespaceManager.get(scanNode.getTableDesc().getUri());
FileTablespace space = TablespaceManager.get(scanNode.getTableDesc().getUri());
for (Path path : partitionedTableScanNode.getInputPaths()) {
fileFragments.addAll(Arrays.asList(space.split(scanNode.getCanonicalName(), path)));
}

FragmentProto[] fragments =
FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()]));
FragmentConvertor.toFragmentProtoArray(conf, fileFragments.toArray(new Fragment[fileFragments.size()]));

ctx.addFragments(scanNode.getCanonicalName(), fragments);
return new PartitionMergeScanExec(ctx, scanNode, fragments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public BSTIndexScanExec(TaskAttemptContext context, IndexScanNode plan,

this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());

Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(fragment));
Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(context.getConf(), fragment));
this.reader = new BSTIndex(context.getConf()).
getIndexReader(indexPath, keySchema, comparator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public ExternalSortExec(final TaskAttemptContext context,final SortNode plan, fi

mergedInputFragments = new ArrayList<>();
for (CatalogProtos.FragmentProto proto : fragments) {
FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
FileFragment fragment = FragmentConvertor.convert(context.getConf(), proto);
mergedInputFragments.add(new Chunk(inSchema, fragment, scanNode.getTableDesc().getMeta()));
}
}
Expand Down Expand Up @@ -464,7 +464,7 @@ private Scanner externalMergeAndSort(List<Chunk> chunks) throws Exception {
debug(LOG, "Remove intermediate memory tuples: " + chunk.getMemoryTuples().usedMem());
}
chunk.getMemoryTuples().release();
} else if (chunk.getFragment().getTableName().contains(INTERMEDIATE_FILE_PREFIX)) {
} else if (chunk.getFragment().getInputSourceId().contains(INTERMEDIATE_FILE_PREFIX)) {
localFS.delete(chunk.getFragment().getPath(), true);
numDeletedFiles++;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

package org.apache.tajo.engine.planner.physical;

import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;

public class IndexExecutorUtil {

public static String getIndexFileName(FragmentProto fragmentProto) {
FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, fragmentProto);
public static String getIndexFileName(Configuration conf, FragmentProto fragmentProto) {
FileFragment fileFragment = FragmentConvertor.convert(conf, fragmentProto);
StringBuilder sb = new StringBuilder();
sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength());
return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;

import java.io.IOException;
Expand Down Expand Up @@ -120,7 +121,7 @@ public static CatalogProtos.FragmentProto[] getNonZeroLengthDataFiles(TajoConf t
fragments.add(fileFragment);
}
}
return FragmentConvertor.toFragmentProtoArray(fragments.toArray(new FileFragment[fragments.size()]));
return FragmentConvertor.toFragmentProtoArray(tajoConf, fragments.toArray(new Fragment[fragments.size()]));
}

/**
Expand Down
Loading