diff --git a/CHANGES b/CHANGES index 182f1d5c4a..b2525a242d 100644 --- a/CHANGES +++ b/CHANGES @@ -4,7 +4,7 @@ Release 0.12.0 - unreleased NEW FEATURES - TAJO-1686: Allow Tajo to use Hive UDF. (jihoon) + TAJO-1686: Allow Tajo to use Hive UDF. (Jongyoung Park via jihoon) TAJO-2122: PullServer as an Auxiliary service of Yarn. (jihoon) diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index e79bc75dbb..b42cf58a5f 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -75,7 +75,7 @@ message SchemaProto { message FragmentProto { required string id = 1; - required string data_format = 2; + required string kind = 2; required bytes contents = 3; } diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java index 559bed3bff..294f8bb05a 100644 --- a/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java +++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java @@ -18,12 +18,10 @@ package org.apache.tajo.datum; -import org.junit.Test; import org.apache.tajo.common.TajoDataTypes; +import org.junit.Test; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class TestInt4Datum { diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java index 69a02305d9..f4a9bcbb19 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java @@ -555,8 +555,8 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception Tablespace tablespace = TablespaceManager.getByName("cluster1"); List fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual()); assertEquals(1, fragments.size()); - assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow())); - assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow())); + assertEquals("021", ((HBaseFragment)fragments.get(0)).getStartKey().toString()); + assertEquals("021" + postFix, ((HBaseFragment)fragments.get(0)).getEndKey().toString()); // where rk >= '020' and rk <= '055' EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), @@ -569,12 +569,12 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual()); assertEquals(2, fragments.size()); HBaseFragment fragment1 = (HBaseFragment) fragments.get(0); - assertEquals("020", new String(fragment1.getStartRow())); - assertEquals("040", new String(fragment1.getStopRow())); + assertEquals("020", fragment1.getStartKey().toString()); + assertEquals("040", fragment1.getEndKey().toString()); HBaseFragment fragment2 = (HBaseFragment) fragments.get(1); - assertEquals("040", new String(fragment2.getStartRow())); - assertEquals("055" + postFix, new String(fragment2.getStopRow())); + assertEquals("040", fragment2.getStartKey().toString()); + assertEquals("055" + postFix, fragment2.getEndKey().toString()); // where (rk >= '020' and rk <= '055') or rk = '075' EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), @@ -584,16 +584,16 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode.getQual()); assertEquals(3, fragments.size()); fragment1 = (HBaseFragment) fragments.get(0); - assertEquals("020", new String(fragment1.getStartRow())); - assertEquals("040", new String(fragment1.getStopRow())); + assertEquals("020", fragment1.getStartKey().toString()); + assertEquals("040", fragment1.getEndKey().toString()); fragment2 = (HBaseFragment) fragments.get(1); - assertEquals("040", new String(fragment2.getStartRow())); - assertEquals("055" + postFix, new String(fragment2.getStopRow())); + assertEquals("040", fragment2.getStartKey().toString()); + assertEquals("055" + postFix, fragment2.getEndKey().toString()); HBaseFragment fragment3 = (HBaseFragment) fragments.get(2); - assertEquals("075", new String(fragment3.getStartRow())); - assertEquals("075" + postFix, new String(fragment3.getStopRow())); + assertEquals("075", fragment3.getStartKey().toString()); + assertEquals("075" + postFix, fragment3.getEndKey().toString()); // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078') @@ -608,16 +608,16 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception assertEquals(3, fragments.size()); fragment1 = (HBaseFragment) fragments.get(0); - assertEquals("020", new String(fragment1.getStartRow())); - assertEquals("040", new String(fragment1.getStopRow())); + assertEquals("020", fragment1.getStartKey().toString()); + assertEquals("040", fragment1.getEndKey().toString()); fragment2 = (HBaseFragment) fragments.get(1); - assertEquals("040", new String(fragment2.getStartRow())); - assertEquals("055" + postFix, new String(fragment2.getStopRow())); + assertEquals("040", fragment2.getStartKey().toString()); + assertEquals("055" + postFix, fragment2.getEndKey().toString()); fragment3 = (HBaseFragment) fragments.get(2); - assertEquals("072", new String(fragment3.getStartRow())); - assertEquals("078" + postFix, new String(fragment3.getStopRow())); + assertEquals("072", fragment3.getStartKey().toString()); + assertEquals("078" + postFix, fragment3.getEndKey().toString()); // where (rk >= '020' and rk <= '055') or (rk >= '057' and rk <= '059') evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), @@ -631,12 +631,12 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception assertEquals(2, fragments.size()); fragment1 = (HBaseFragment) fragments.get(0); - assertEquals("020", new String(fragment1.getStartRow())); - assertEquals("040", new String(fragment1.getStopRow())); + assertEquals("020", fragment1.getStartKey().toString()); + assertEquals("040", fragment1.getEndKey().toString()); fragment2 = (HBaseFragment) fragments.get(1); - assertEquals("040", new String(fragment2.getStartRow())); - assertEquals("059" + postFix, new String(fragment2.getStopRow())); + assertEquals("040", fragment2.getStartKey().toString()); + assertEquals("059" + postFix, fragment2.getEndKey().toString()); } @Test diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java index 86f7d1a730..cccff709f1 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java @@ -20,6 +20,8 @@ import com.google.common.collect.Sets; import org.apache.hadoop.fs.Path; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.fragment.BuiltinFragmentKinds; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.CommonTestingUtil; @@ -34,10 +36,12 @@ import static org.junit.Assert.assertTrue; public class TestFileFragment { + private TajoConf conf; private Path path; @Before public final void setUp() throws Exception { + conf = new TajoConf(); path = CommonTestingUtil.getTestDir(); } @@ -45,7 +49,7 @@ public final void setUp() throws Exception { public final void testGetAndSetFields() { FileFragment fragment1 = new FileFragment("table1_1", new Path(path, "table0"), 0, 500); - assertEquals("table1_1", fragment1.getTableName()); + assertEquals("table1_1", fragment1.getInputSourceId()); assertEquals(new Path(path, "table0"), fragment1.getPath()); assertTrue(0 == fragment1.getStartKey()); assertTrue(500 == fragment1.getLength()); @@ -55,8 +59,9 @@ public final void testGetAndSetFields() { public final void testGetProtoAndRestore() { FileFragment fragment = new FileFragment("table1_1", new Path(path, "table0"), 0, 500); - FileFragment fragment1 = FragmentConvertor.convert(FileFragment.class, fragment.getProto()); - assertEquals("table1_1", fragment1.getTableName()); + FileFragment fragment1 = FragmentConvertor.convert(conf, BuiltinFragmentKinds.FILE, + FragmentConvertor.toFragmentProto(conf, fragment)); + assertEquals("table1_1", fragment1.getInputSourceId()); assertEquals(new Path(path, "table0"), fragment1.getPath()); assertTrue(0 == fragment1.getStartKey()); assertTrue(500 == fragment1.getLength()); @@ -73,7 +78,7 @@ public final void testCompareTo() { Arrays.sort(tablets); for(int i = 0; i < num; i++) { - assertEquals("tablet1_"+i, tablets[i].getTableName()); + assertEquals("tablet1_"+i, tablets[i].getInputSourceId()); } } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java deleted file mode 100644 index 0173f59083..0000000000 --- a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.collect.Sets; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.TajoTestingCluster; -import org.apache.tajo.TpchTestBase; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SchemaBuilder; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.fragment.FileFragment; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.Set; - -import static org.junit.Assert.assertEquals; - -public class TestRowFile { - private static final Log LOG = LogFactory.getLog(TestRowFile.class); - - private TajoTestingCluster cluster; - private TajoConf conf; - - @Before - public void setup() throws Exception { - cluster = TpchTestBase.getInstance().getTestingCluster(); - conf = cluster.getConfiguration(); - } - - @After - public void teardown() throws Exception { - } - - @Test - public void test() throws IOException { - Schema schema = SchemaBuilder.builder() - .add("id", Type.INT4) - .add("age", Type.INT8) - .add("description", Type.TEXT) - .build(); - - TableMeta meta = CatalogUtil.newTableMeta("ROWFILE", conf); - - FileTablespace sm = (FileTablespace) TablespaceManager.get(cluster.getDefaultFileSystem().getUri()); - - Path tablePath = new Path("/test"); - Path dataPath = new Path(tablePath, "test.tbl"); - FileSystem fs = sm.getFileSystem(); - fs.mkdirs(tablePath); - - Appender appender = sm.getAppender(meta, schema, dataPath); - appender.enableStats(); - appender.init(); - - int tupleNum = 200; - Tuple tuple; - Datum stringDatum = DatumFactory.createText("abcdefghijklmnopqrstuvwxyz"); - Set idSet = Sets.newHashSet(); - - tuple = new VTuple(3); - long start = System.currentTimeMillis(); - for(int i = 0; i < tupleNum; i++) { - tuple.put(0, DatumFactory.createInt4(i + 1)); - tuple.put(1, DatumFactory.createInt8(25l)); - tuple.put(2, stringDatum); - appender.addTuple(tuple); - idSet.add(i+1); - } - appender.close(); - - TableStats stat = appender.getStats(); - assertEquals(tupleNum, stat.getNumRows().longValue()); - - FileStatus file = fs.getFileStatus(dataPath); - FileFragment fragment = new FileFragment("test.tbl", dataPath, 0, file.getLen()); - - int tupleCnt = 0; - start = System.currentTimeMillis(); - Scanner scanner = sm.getScanner(meta, schema, fragment, null); - scanner.init(); - while ((tuple=scanner.next()) != null) { - tupleCnt++; - } - scanner.close(); - - assertEquals(tupleNum, tupleCnt); - - tupleCnt = 0; - long fileStart = 0; - long fileLen = file.getLen()/13; - - for (int i = 0; i < 13; i++) { - fragment = new FileFragment("test.tbl", dataPath, fileStart, fileLen); - scanner = new RowFile.RowFileScanner(conf, schema, meta, fragment); - scanner.init(); - while ((tuple=scanner.next()) != null) { - if (!idSet.remove(tuple.getInt4(0)) && LOG.isDebugEnabled()) { - LOG.debug("duplicated! " + tuple.getInt4(0)); - } - tupleCnt++; - } - scanner.close(); - fileStart += fileLen; - if (i == 11) { - fileLen = file.getLen() - fileStart; - } - } - assertEquals(tupleNum, tupleCnt); - } -} diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 87a6e74ba9..fce56fc366 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -923,13 +923,13 @@ public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, St PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode; List fileFragments = new ArrayList<>(); - FileTablespace space = (FileTablespace) TablespaceManager.get(scanNode.getTableDesc().getUri()); + FileTablespace space = TablespaceManager.get(scanNode.getTableDesc().getUri()); for (Path path : partitionedTableScanNode.getInputPaths()) { fileFragments.addAll(Arrays.asList(space.split(scanNode.getCanonicalName(), path))); } FragmentProto[] fragments = - FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()])); + FragmentConvertor.toFragmentProtoArray(conf, fileFragments.toArray(new Fragment[fileFragments.size()])); ctx.addFragments(scanNode.getCanonicalName(), fragments); return new PartitionMergeScanExec(ctx, scanNode, fragments); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index 3be1d365d3..7ab0943915 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@ -86,7 +86,7 @@ public BSTIndexScanExec(TaskAttemptContext context, IndexScanNode plan, this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); - Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(fragment)); + Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(context.getConf(), fragment)); this.reader = new BSTIndex(context.getConf()). getIndexReader(indexPath, keySchema, comparator); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 3d2de28fca..4ec5144107 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -170,7 +170,7 @@ public ExternalSortExec(final TaskAttemptContext context,final SortNode plan, fi mergedInputFragments = new ArrayList<>(); for (CatalogProtos.FragmentProto proto : fragments) { - FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto); + FileFragment fragment = FragmentConvertor.convert(context.getConf(), proto); mergedInputFragments.add(new Chunk(inSchema, fragment, scanNode.getTableDesc().getMeta())); } } @@ -464,7 +464,7 @@ private Scanner externalMergeAndSort(List chunks) throws Exception { debug(LOG, "Remove intermediate memory tuples: " + chunk.getMemoryTuples().usedMem()); } chunk.getMemoryTuples().release(); - } else if (chunk.getFragment().getTableName().contains(INTERMEDIATE_FILE_PREFIX)) { + } else if (chunk.getFragment().getInputSourceId().contains(INTERMEDIATE_FILE_PREFIX)) { localFS.delete(chunk.getFragment().getPath(), true); numDeletedFiles++; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java index 3b8317fe75..df143c8baf 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java @@ -18,14 +18,15 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.hadoop.conf.Configuration; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; public class IndexExecutorUtil { - public static String getIndexFileName(FragmentProto fragmentProto) { - FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, fragmentProto); + public static String getIndexFileName(Configuration conf, FragmentProto fragmentProto) { + FileFragment fileFragment = FragmentConvertor.convert(conf, fragmentProto); StringBuilder sb = new StringBuilder(); sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength()); return sb.toString(); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java index d1dfe40430..074d0abed3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java @@ -40,6 +40,7 @@ import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import java.io.IOException; @@ -120,7 +121,7 @@ public static CatalogProtos.FragmentProto[] getNonZeroLengthDataFiles(TajoConf t fragments.add(fileFragment); } } - return FragmentConvertor.toFragmentProtoArray(fragments.toArray(new FileFragment[fragments.size()])); + return FragmentConvertor.toFragmentProtoArray(tajoConf, fragments.toArray(new Fragment[fragments.size()])); } /** diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index dc48f3fb0f..52cb080edd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -97,7 +97,7 @@ private void rewriteColumnPartitionedTableSchema() throws IOException { Tuple partitionRow = null; if (fragments != null && fragments.length > 0) { - List fileFragments = FragmentConvertor.convert(FileFragment.class, fragments); + List fileFragments = FragmentConvertor.convert(context.getConf(), fragments); // Get a partition key value from a given path partitionRow = PartitionedTableRewriter.buildTupleFromPartitionPath( diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java index c5e10939f0..46f672d1c1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java @@ -72,7 +72,7 @@ public void init() throws IOException { TajoConf conf = context.getConf(); Path indexPath = new Path(logicalPlan.getIndexPath().toString(), - IndexExecutorUtil.getIndexFileName(scanExec.getFragments()[0])); + IndexExecutorUtil.getIndexFileName(conf, scanExec.getFragments()[0])); // TODO: Create factory using reflection BSTIndex bst = new BSTIndex(conf); this.comparator = new BaseTupleComparator(keySchema, sortSpecs); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index a1728ec2ca..77048ec7fc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -111,7 +111,9 @@ private void initSeqScanExec() throws IOException, TajoException { } if (!fragments.isEmpty()) { - FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[fragments.size()])); + FragmentProto[] fragmentProtos = + FragmentConvertor.toFragmentProtoArray(tajoConf, fragments.toArray(new Fragment[fragments.size()])); + this.taskContext = new TaskAttemptContext( new QueryContext(tajoConf), null, new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java index f8b89f1743..f1ad931e47 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -281,7 +282,7 @@ private TaskHistory makeTaskHistory() { fragmentList.add(fragment.toString()); } catch (Exception e) { LOG.error(e.getMessage(), e); - fragmentList.add("ERROR: " + eachFragment.getDataFormat() + "," + eachFragment.getId() + ": " + e.getMessage()); + fragmentList.add("ERROR: " + eachFragment.getKind() + "," + eachFragment.getId() + ": " + e.getMessage()); } } taskHistory.setFragments(fragmentList.toArray(new String[fragmentList.size()])); @@ -330,25 +331,25 @@ public void setLogicalPlan(LogicalNode plan) { } private void addDataLocation(Fragment fragment) { - String[] hosts = fragment.getHosts(); - int[] diskIds = null; + ImmutableList hosts = fragment.getHostNames(); + Integer[] diskIds = null; if (fragment instanceof FileFragment) { diskIds = ((FileFragment)fragment).getDiskIds(); } - for (int i = 0; i < hosts.length; i++) { - dataLocations.add(new DataLocation(hosts[i], diskIds == null ? DataLocation.UNKNOWN_VOLUME_ID : diskIds[i])); + for (int i = 0; i < hosts.size(); i++) { + dataLocations.add(new DataLocation(hosts.get(i), diskIds == null ? DataLocation.UNKNOWN_VOLUME_ID : diskIds[i])); } } public void addFragment(Fragment fragment, boolean useDataLocation) { Set fragmentProtos; - if (fragMap.containsKey(fragment.getTableName())) { - fragmentProtos = fragMap.get(fragment.getTableName()); + if (fragMap.containsKey(fragment.getInputSourceId())) { + fragmentProtos = fragMap.get(fragment.getInputSourceId()); } else { fragmentProtos = new HashSet<>(); - fragMap.put(fragment.getTableName(), fragmentProtos); + fragMap.put(fragment.getInputSourceId(), fragmentProtos); } - fragmentProtos.add(fragment.getProto()); + fragmentProtos.add(FragmentConvertor.toFragmentProto(systemConf, fragment)); if (useDataLocation) { addDataLocation(fragment); } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 47f1af292b..1cba931001 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -128,7 +128,8 @@ public TaskAttemptContext(QueryContext queryContext, final ExecutionBlockContext @VisibleForTesting public TaskAttemptContext(final QueryContext queryContext, final TaskAttemptId taskAttemptId, final Fragment [] fragments, final Path workDir) { - this(queryContext, null, taskAttemptId, FragmentConvertor.toFragmentProtoArray(fragments), workDir); + this(queryContext, null, taskAttemptId, FragmentConvertor.toFragmentProtoArray(queryContext.getConf(), fragments), + workDir); } public TajoConf getConf() { @@ -258,12 +259,12 @@ public Map getPartitionOutputVolume() { public void updateAssignedFragments(String tableId, Fragment[] fragments) { fragmentMap.remove(tableId); for(Fragment t : fragments) { - if (fragmentMap.containsKey(t.getTableName())) { - fragmentMap.get(t.getTableName()).add(t.getProto()); + if (fragmentMap.containsKey(t.getInputSourceId())) { + fragmentMap.get(t.getInputSourceId()).add(FragmentConvertor.toFragmentProto(getConf(), t)); } else { List frags = new ArrayList<>(); - frags.add(t.getProto()); - fragmentMap.put(t.getTableName(), frags); + frags.add(FragmentConvertor.toFragmentProto(getConf(), t)); + fragmentMap.put(t.getInputSourceId(), frags); } } } @@ -281,7 +282,7 @@ public void addFragments(String tableId, FragmentProto[] fragments) { List paths = fragmentToPath(tableFragments); for (FragmentProto eachFragment: fragments) { - FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment); + FileFragment fileFragment = FragmentConvertor.convert(getConf(), eachFragment); // If current attempt already has same path, we don't need to add it to fragments. if (!paths.contains(fileFragment.getPath())) { tableFragments.add(eachFragment); @@ -297,7 +298,7 @@ private List fragmentToPath(List tableFragments) { List list = new ArrayList<>(); for (FragmentProto proto : tableFragments) { - FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto); + FileFragment fragment = FragmentConvertor.convert(getConf(), proto); list.add(fragment.getPath()); } diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index 00e6d75a12..c1c4a2b6e1 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -240,6 +240,21 @@ public synchronized SeekableScanner getSeekableScanner(TableMeta meta, Schema sc return (SeekableScanner)this.getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target); } + /** + * Returns Scanner instance. + * + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target The output schema + * @return Scanner instance + * @throws IOException + */ + public synchronized SeekableScanner getSeekableScanner(TableMeta meta, Schema schema, Fragment fragment, + Schema target) throws IOException { + return (SeekableScanner)this.getScanner(meta, schema, fragment, target); + } + /** * Returns Appender instance. * @param queryContext Query property. diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java new file mode 100644 index 0000000000..9c4fce5c92 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.fragment; + +public class BuiltinFragmentKinds { + public static final String FILE = "FILE"; + public static final String HBASE = "HBASE"; + public static final String JDBC = "JDBC"; +} diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java index ac43197296..a8de4abad7 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java @@ -18,22 +18,150 @@ package org.apache.tajo.storage.fragment; -import org.apache.tajo.common.ProtoObject; +import com.google.common.collect.ImmutableList; -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import java.net.URI; -public interface Fragment extends ProtoObject { +/** + * The Fragment is similar to the split in MapReduce. + * For distributed processing of a a single large table, + * it contains the information of which part of data will be processed by each task. + * + * @param type of fragment key. It should implement the Comparable interface. + */ +public abstract class Fragment implements Comparable>, Cloneable { - public abstract String getTableName(); + protected String kind; + protected URI uri; + protected String inputSourceId; + protected T startKey; + protected T endKey; + protected long length; + protected ImmutableList hostNames; - @Override - public abstract FragmentProto getProto(); + protected Fragment(String kind, + URI uri, + String inputSourceId, + T startKey, + T endKey, + long length, + String[] hostNames) { + this.kind = kind; + this.uri = uri; + this.inputSourceId = inputSourceId; + this.startKey = startKey; + this.endKey = endKey; + this.length = length; + this.hostNames = hostNames == null ? ImmutableList.of() : ImmutableList.copyOf(hostNames); + } + + /** + * Returns the fragment type. + * + * @return fragment type + */ + public final String getKind() { + return kind; + } + + /** + * Returns an unique URI of the input source. + * + * @return URI of the input source + */ + public final URI getUri() { + return uri; + } - public abstract long getLength(); + /** + * Returns a unique id of the input source. + * + * @return id of the input source + */ + public final String getInputSourceId() { + return this.inputSourceId; + } - public abstract String getKey(); + /** + * Returns the start key of the data range. + * {@link org.apache.tajo.storage.Scanner} will start reading data from the point indicated by this key. + * + * @return start key + */ + public final T getStartKey() { + return startKey; + } - public String[] getHosts(); + /** + * Returns the end key of the data range. + * {@link org.apache.tajo.storage.Scanner} will stop reading data when it reaches the point indicated by this key. + * + * @return end key + */ + public final T getEndKey() { + return endKey; + } - public abstract boolean isEmpty(); + /** + * Returns the length of the data range between start key and end key. + * + * @return length of the range + */ + public final long getLength() { + return length; + } + + /** + * Returns host names which have any portion of the data between start key and end key. + * + * @return host names + */ + public final ImmutableList getHostNames() { + return hostNames; + } + + /** + * Indicates the fragment is empty or not. + * An empty fragment means that there is no data to read. + * + * @return true if the fragment is empty. Otherwise, false. + */ + public boolean isEmpty() { + return length == 0; + } + + /** + * First compares URIs of fragments, and then compares their start keys. + * + * @param t + * @return return 0 if two fragments are same. If not same, return -1 if this fragment is smaller than the other. + * Otherwise, return 1; + */ + @Override + public final int compareTo(Fragment t) { + int cmp = uri.compareTo(t.uri); + if (cmp == 0) { + if (startKey != null && t.startKey != null) { + return startKey.compareTo(t.startKey); + } else if (startKey == null) { // nulls last + return 1; + } else { + return -1; + } + } else { + return cmp; + } + } + + @Override + public Object clone() throws CloneNotSupportedException { + Fragment clone = (Fragment) super.clone(); + clone.uri = this.uri; + clone.inputSourceId = this.inputSourceId; + clone.startKey = this.startKey; + clone.endKey = this.endKey; + clone.hostNames = this.hostNames; + clone.length = this.length; + return clone; + } } diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java index 4ce6928b84..6c658f9eee 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java @@ -20,13 +20,12 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.conf.Configuration; import org.apache.tajo.annotation.ThreadSafe; import org.apache.tajo.exception.TajoInternalError; -import java.io.IOException; -import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Map; @@ -34,95 +33,81 @@ @ThreadSafe public class FragmentConvertor { - /** - * Cache of fragment classes - */ - protected static final Map> CACHED_FRAGMENT_CLASSES = Maps.newConcurrentMap(); - /** - * Cache of constructors for each class. - */ - private static final Map, Constructor> CONSTRUCTOR_CACHE = Maps.newConcurrentMap(); - /** - * default parameter for all constructors - */ - private static final Class[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class }; - public static Class getFragmentClass(Configuration conf, String dataFormat) { - Class fragmentClass = CACHED_FRAGMENT_CLASSES.get(dataFormat.toLowerCase()); - if (fragmentClass == null) { - fragmentClass = conf.getClass( - String.format("tajo.storage.fragment.%s.class", dataFormat.toLowerCase()), null, Fragment.class); - CACHED_FRAGMENT_CLASSES.put(dataFormat.toLowerCase(), fragmentClass); + private static final Map SERDE_HELPER_MAP = Maps.newConcurrentMap(); + + private static FragmentSerdeHelper getFragmentSerdeHelper(Configuration conf, String fragmentKind) { + fragmentKind = fragmentKind.toLowerCase(); + FragmentSerdeHelper helper = SERDE_HELPER_MAP.get(fragmentKind); + if (helper == null) { + Class helperClass = conf.getClass( + String.format("tajo.storage.fragment.serde-helper.%s", fragmentKind), null, FragmentSerdeHelper.class); + try { + helper = helperClass.getConstructor(null).newInstance(); + } catch (InstantiationException + | IllegalAccessException + | InvocationTargetException + | NoSuchMethodException e) { + throw new TajoInternalError(e); + } + SERDE_HELPER_MAP.put(fragmentKind, helper); } - if (fragmentClass == null) { - throw new TajoInternalError("No such a fragment for " + dataFormat.toLowerCase()); + if (helper == null) { + throw new TajoInternalError("No such a serde helper for " + fragmentKind); } - return fragmentClass; + return helper; } - public static T convert(Class clazz, FragmentProto fragment) { - T result; + public static T convert(Configuration conf, String fragmentKind, FragmentProto fragment) { + FragmentSerdeHelper helper = getFragmentSerdeHelper(conf, fragmentKind); try { - Constructor constructor = (Constructor) CONSTRUCTOR_CACHE.get(clazz); - if (constructor == null) { - constructor = clazz.getDeclaredConstructor(DEFAULT_FRAGMENT_PARAMS); - constructor.setAccessible(true); - CONSTRUCTOR_CACHE.put(clazz, constructor); - } - result = constructor.newInstance(new Object[]{fragment.getContents()}); - } catch (Throwable e) { + return (T) helper.deserialize( + helper.newBuilder() + .mergeFrom(fragment.getContents()) + .build()); + } catch (InvalidProtocolBufferException e) { throw new TajoInternalError(e); } - - return result; } public static T convert(Configuration conf, FragmentProto fragment) { - Class fragmentClass = (Class) getFragmentClass(conf, fragment.getDataFormat().toLowerCase()); - if (fragmentClass == null) { - throw new TajoInternalError("No such a fragment class for " + fragment.getDataFormat()); - } - return convert(fragmentClass, fragment); + return convert(conf, fragment.getKind(), fragment); } - public static List convert(Class clazz, FragmentProto...fragments) - throws IOException { + public static List convert(Configuration conf, FragmentProto...fragments) { List list = Lists.newArrayList(); if (fragments == null) { return list; } for (FragmentProto proto : fragments) { - list.add(convert(clazz, proto)); + list.add(convert(conf, proto)); } return list; } - public static List convert(Configuration conf, FragmentProto...fragments) { - List list = Lists.newArrayList(); - if (fragments == null) { - return list; - } - for (FragmentProto proto : fragments) { - list.add((T) convert(conf, proto)); - } - return list; + public static FragmentProto toFragmentProto(Configuration conf, Fragment fragment) { + FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder(); + fragmentBuilder.setId(fragment.getInputSourceId()); + fragmentBuilder.setKind(fragment.getKind()); + fragmentBuilder.setContents(getFragmentSerdeHelper(conf, fragment.getKind()).serialize(fragment).toByteString()); + return fragmentBuilder.build(); } - public static List toFragmentProtoList(Fragment... fragments) { + public static List toFragmentProtoList(Configuration conf, Fragment... fragments) { List list = Lists.newArrayList(); if (fragments == null) { return list; } for (Fragment fragment : fragments) { - list.add(fragment.getProto()); + list.add(toFragmentProto(conf, fragment)); } return list; } - public static FragmentProto [] toFragmentProtoArray(Fragment... fragments) { - List list = toFragmentProtoList(fragments); + public static FragmentProto [] toFragmentProtoArray(Configuration conf, Fragment... fragments) { + List list = toFragmentProtoList(conf, fragments); return list.toArray(new FragmentProto[list.size()]); } } diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentSerdeHelper.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentSerdeHelper.java new file mode 100644 index 0000000000..c76a156685 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentSerdeHelper.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.fragment; + +import com.google.protobuf.Message; +import com.google.protobuf.GeneratedMessage.Builder; + +/** + * FragmentSerdeHelper abstracts how a fragment is serialized / deserialized to / from a Protocol Buffer message. + * + * @param Fragment class + * @param

Protocol Buffer Message class corresponding to the Fragment class + */ +public interface FragmentSerdeHelper { + + /** + * Creates a new builder of {@link P}. + * + * @return a Protocol Buffer message builder + */ + Builder newBuilder(); + + /** + * Serializes a fragment into a Protocol Buffer message. + * + * @param fragment + * @return a serialized Protocol Buffer message + */ + P serialize(F fragment); + + /** + * Deserializes a Protocol Buffer message to a fragment. + * + * @param proto + * @return a deserialized fragment instance + */ + F deserialize(P proto); +} diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml index 2454714452..29fe5f6d56 100644 --- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml @@ -44,52 +44,28 @@ - tajo.storage.fragment.text.class + tajo.storage.fragment.kind.file org.apache.tajo.storage.fragment.FileFragment - tajo.storage.fragment.json.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.raw.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.draw.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.rcfile.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.row.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.parquet.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.orc.class - org.apache.tajo.storage.fragment.FileFragment + tajo.storage.fragment.kind.hbase + org.apache.tajo.storage.hbase.HBaseFragment - tajo.storage.fragment.sequencefile.class - org.apache.tajo.storage.fragment.FileFragment + tajo.storage.fragment.kind.jdbc + org.apache.tajo.storage.jdbc.JdbcFragment - tajo.storage.fragment.avro.class - org.apache.tajo.storage.fragment.FileFragment + tajo.storage.fragment.serde-helper.file + org.apache.tajo.storage.fragment.FileFragmentSerdeHelper - tajo.storage.fragment.hbase.class - org.apache.tajo.storage.hbase.HBaseFragment + tajo.storage.fragment.serde-helper.hbase + org.apache.tajo.storage.hbase.HBaseFragmentSerdeHelper - tajo.storage.fragment.jdbc.class - org.apache.tajo.storage.jdbc.JdbcFragment + tajo.storage.fragment.serde-helper.jdbc + org.apache.tajo.storage.jdbc.JdbcFragmentSerdeHelper diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml index 1c4530a3cd..5a99956e39 100644 --- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml @@ -43,52 +43,28 @@ - tajo.storage.fragment.text.class + tajo.storage.fragment.kind.file org.apache.tajo.storage.fragment.FileFragment - tajo.storage.fragment.json.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.raw.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.draw.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.rcfile.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.row.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.parquet.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.orc.class - org.apache.tajo.storage.fragment.FileFragment + tajo.storage.fragment.kind.hbase + org.apache.tajo.storage.hbase.HBaseFragment - tajo.storage.fragment.sequencefile.class - org.apache.tajo.storage.fragment.FileFragment + tajo.storage.fragment.kind.jdbc + org.apache.tajo.storage.jdbc.JdbcFragment - tajo.storage.fragment.avro.class - org.apache.tajo.storage.fragment.FileFragment + tajo.storage.fragment.serde-helper.file + org.apache.tajo.storage.fragment.FileFragmentSerdeHelper - tajo.storage.fragment.hbase.class - org.apache.tajo.storage.hbase.HBaseFragment + tajo.storage.fragment.serde-helper.hbase + org.apache.tajo.storage.hbase.HBaseFragmentSerdeHelper - tajo.storage.fragment.jdbc.class - org.apache.tajo.storage.jdbc.JdbcFragment + tajo.storage.fragment.serde-helper.jdbc + org.apache.tajo.storage.jdbc.JdbcFragmentSerdeHelper diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java index 18aa515b93..e1026bb254 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java @@ -19,112 +19,49 @@ package org.apache.tajo.storage.hbase; import com.google.common.base.Objects; -import com.google.gson.annotations.Expose; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.tajo.BuiltinStorages; -import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.storage.fragment.BuiltinFragmentKinds; import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.hbase.StorageFragmentProtos.HBaseFragmentProto; +import org.apache.tajo.storage.hbase.HBaseFragment.HBaseFragmentKey; import java.net.URI; -public class HBaseFragment implements Fragment, Comparable, Cloneable { - @Expose - private URI uri; - @Expose - private String tableName; - @Expose +/** + * Fragment for HBase + */ +public class HBaseFragment extends Fragment { private String hbaseTableName; - @Expose - private byte[] startRow; - @Expose - private byte[] stopRow; - @Expose - private String regionLocation; - @Expose private boolean last; - @Expose - private long length; public HBaseFragment(URI uri, String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow, String regionLocation) { - this.uri = uri; - this.tableName = tableName; + super(BuiltinFragmentKinds.HBASE, uri, tableName, new HBaseFragmentKey(startRow), new HBaseFragmentKey(stopRow), + TajoConstants.UNKNOWN_LENGTH, new String[]{regionLocation}); + this.hbaseTableName = hbaseTableName; - this.startRow = startRow; - this.stopRow = stopRow; - this.regionLocation = regionLocation; this.last = false; } - public HBaseFragment(ByteString raw) throws InvalidProtocolBufferException { - HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder(); - builder.mergeFrom(raw); - builder.build(); - init(builder.build()); - } - - private void init(HBaseFragmentProto proto) { - this.uri = URI.create(proto.getUri()); - this.tableName = proto.getTableName(); - this.hbaseTableName = proto.getHbaseTableName(); - this.startRow = proto.getStartRow().toByteArray(); - this.stopRow = proto.getStopRow().toByteArray(); - this.regionLocation = proto.getRegionLocation(); - this.length = proto.getLength(); - this.last = proto.getLast(); - } - - @Override - public int compareTo(HBaseFragment t) { - return Bytes.compareTo(startRow, t.startRow); - } - - public URI getUri() { - return uri; - } - - @Override - public String getTableName() { - return tableName; - } - - @Override - public String getKey() { - return new String(startRow); + public HBaseFragment(URI uri, String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow, + String regionLocation, boolean last) { + this(uri, tableName, hbaseTableName, startRow, stopRow, regionLocation); + this.last = last; } @Override public boolean isEmpty() { - return startRow == null || stopRow == null; - } - - @Override - public long getLength() { - return length; + return startKey.isEmpty() || endKey.isEmpty(); } public void setLength(long length) { this.length = length; } - @Override - public String[] getHosts() { - return new String[] {regionLocation}; - } - public Object clone() throws CloneNotSupportedException { HBaseFragment frag = (HBaseFragment) super.clone(); - frag.uri = uri; - frag.tableName = tableName; frag.hbaseTableName = hbaseTableName; - frag.startRow = startRow; - frag.stopRow = stopRow; - frag.regionLocation = regionLocation; frag.last = last; - frag.length = length; return frag; } @@ -132,9 +69,9 @@ public Object clone() throws CloneNotSupportedException { public boolean equals(Object o) { if (o instanceof HBaseFragment) { HBaseFragment t = (HBaseFragment) o; - if (tableName.equals(t.tableName) - && Bytes.equals(startRow, t.startRow) - && Bytes.equals(stopRow, t.stopRow)) { + if (inputSourceId.equals(t.inputSourceId) + && startKey.equals(t.startKey) + && endKey.equals(t.endKey)) { return true; } } @@ -143,51 +80,19 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hashCode(tableName, hbaseTableName, startRow, stopRow); + return Objects.hashCode(inputSourceId, hbaseTableName, startKey, endKey); } @Override public String toString() { return - "\"fragment\": {\"uri:\"" + uri.toString() +"\", \"tableName\": \""+ tableName + + "\"fragment\": {\"uri:\"" + uri.toString() +"\", \"tableName\": \""+ inputSourceId + "\", hbaseTableName\": \"" + hbaseTableName + "\"" + - ", \"startRow\": \"" + new String(startRow) + "\"" + - ", \"stopRow\": \"" + new String(stopRow) + "\"" + + ", \"startRow\": \"" + new String(startKey.bytes) + "\"" + + ", \"stopRow\": \"" + new String(endKey.bytes) + "\"" + ", \"length\": \"" + length + "\"}" ; } - @Override - public FragmentProto getProto() { - HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder(); - builder - .setUri(uri.toString()) - .setTableName(tableName) - .setHbaseTableName(hbaseTableName) - .setStartRow(ByteString.copyFrom(startRow)) - .setStopRow(ByteString.copyFrom(stopRow)) - .setLast(last) - .setLength(length) - .setRegionLocation(regionLocation); - - FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder(); - fragmentBuilder.setId(this.tableName); - fragmentBuilder.setContents(builder.buildPartial().toByteString()); - fragmentBuilder.setDataFormat(BuiltinStorages.HBASE); - return fragmentBuilder.build(); - } - - public byte[] getStartRow() { - return startRow; - } - - public byte[] getStopRow() { - return stopRow; - } - - public String getRegionLocation() { - return regionLocation; - } - public boolean isLast() { return last; } @@ -200,15 +105,51 @@ public String getHbaseTableName() { return hbaseTableName; } - public void setHbaseTableName(String hbaseTableName) { - this.hbaseTableName = hbaseTableName; - } - public void setStartRow(byte[] startRow) { - this.startRow = startRow; + this.startKey = new HBaseFragmentKey(startRow); } public void setStopRow(byte[] stopRow) { - this.stopRow = stopRow; + this.endKey = new HBaseFragmentKey(stopRow); + } + + public static class HBaseFragmentKey implements Comparable { + private final byte[] bytes; + + public HBaseFragmentKey(byte[] key) { + this.bytes = key; + } + + public byte[] getBytes() { + return bytes; + } + + @Override + public int hashCode() { + return Bytes.hashCode(bytes); + } + + @Override + public boolean equals(Object o) { + if (o instanceof HBaseFragmentKey) { + HBaseFragmentKey other = (HBaseFragmentKey) o; + return Bytes.equals(bytes, other.bytes); + } + return false; + } + + @Override + public int compareTo(HBaseFragmentKey o) { + return Bytes.compareTo(bytes, o.bytes); + } + + @Override + public String toString() { + return new String(bytes); + } + + public boolean isEmpty() { + return this.bytes == null; + } } } diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragmentSerdeHelper.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragmentSerdeHelper.java new file mode 100644 index 0000000000..16ade2dc75 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragmentSerdeHelper.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import com.google.protobuf.ByteString; +import com.google.protobuf.GeneratedMessage.Builder; +import org.apache.tajo.storage.fragment.FragmentSerdeHelper; +import org.apache.tajo.storage.hbase.StorageFragmentProtos.HBaseFragmentProto; + +import java.net.URI; + +public class HBaseFragmentSerdeHelper implements FragmentSerdeHelper { + + @Override + public Builder newBuilder() { + return HBaseFragmentProto.newBuilder(); + } + + @Override + public HBaseFragmentProto serialize(HBaseFragment fragment) { + return HBaseFragmentProto.newBuilder() + .setUri(fragment.getUri().toASCIIString()) + .setTableName(fragment.getInputSourceId()) + .setHbaseTableName(fragment.getHbaseTableName()) + .setStartRow(ByteString.copyFrom(fragment.getStartKey().getBytes())) + .setStopRow(ByteString.copyFrom(fragment.getEndKey().getBytes())) + .setLast(fragment.isLast()) + .setLength(fragment.getLength()) + .setRegionLocation(fragment.getHostNames().get(0)) + .build(); + } + + @Override + public HBaseFragment deserialize(HBaseFragmentProto proto) { + return new HBaseFragment( + URI.create(proto.getUri()), + proto.getTableName(), + proto.getHbaseTableName(), + proto.getStartRow().toByteArray(), + proto.getStopRow().toByteArray(), + proto.getRegionLocation(), + proto.getLast()); + } +} diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java index b2ca02dffc..781f911b4f 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java @@ -36,10 +36,16 @@ import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.TextDatum; -import org.apache.tajo.exception.*; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.storage.*; +import org.apache.tajo.storage.Scanner; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.BytesUtils; @@ -175,16 +181,16 @@ private void initScanner() throws IOException { } } - scan.setStartRow(fragment.getStartRow()); - if (fragment.isLast() && fragment.getStopRow() != null && - fragment.getStopRow().length > 0) { + scan.setStartRow(fragment.getStartKey().getBytes()); + if (fragment.isLast() && !fragment.getEndKey().isEmpty() && + fragment.getEndKey().getBytes().length > 0) { // last and stopRow is not empty if (filters == null) { filters = new FilterList(); } - filters.addFilter(new InclusiveStopFilter(fragment.getStopRow())); + filters.addFilter(new InclusiveStopFilter(fragment.getEndKey().getBytes())); } else { - scan.setStopRow(fragment.getStopRow()); + scan.setStopRow(fragment.getEndKey().getBytes()); } if (filters != null) { diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index 132ceff0ae..92a5df0f07 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -525,10 +525,10 @@ private Collection convertRangeToFragment( if (fragmentMap.containsKey(regionStartKey)) { final HBaseFragment prevFragment = fragmentMap.get(regionStartKey); - if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) { + if (Bytes.compareTo(fragmentStart, prevFragment.getStartKey().getBytes()) < 0) { prevFragment.setStartRow(fragmentStart); } - if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) { + if (Bytes.compareTo(fragmentStop, prevFragment.getEndKey().getBytes()) > 0) { prevFragment.setStopRow(fragmentStop); } } else { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 2aa2b91025..28c1bb969c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -447,8 +447,8 @@ public long getMinSplitSize() { /** * Get Disk Ids by Volume Bytes */ - private int[] getDiskIds(VolumeId[] volumeIds) { - int[] diskIds = new int[volumeIds.length]; + private Integer[] getDiskIds(VolumeId[] volumeIds) { + Integer[] diskIds = new Integer[volumeIds.length]; for (int i = 0; i < volumeIds.length; i++) { int diskId = -1; if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java index 6d8443eab6..a6850c14c2 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -92,7 +92,7 @@ public void init() throws IOException { fis = new FileInputStream(file); channel = fis.getChannel(); filePosition = startOffset = fragment.getStartKey(); - endOffset = fragment.getStartKey() + fragment.getLength(); + endOffset = fragment.getEndKey(); if (LOG.isDebugEnabled()) { LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", file size :" + channel.size() diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java deleted file mode 100644 index 958e1c60d9..0000000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java +++ /dev/null @@ -1,502 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.exception.TajoRuntimeException; -import org.apache.tajo.exception.UnsupportedException; -import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.plan.serder.PlanProto.ShuffleType; -import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.storage.exception.AlreadyExistsStorageException; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.BitArray; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.Arrays; - -public class RowFile { - public static final Log LOG = LogFactory.getLog(RowFile.class); - - private static final int SYNC_ESCAPE = -1; - private static final int SYNC_HASH_SIZE = 16; - private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; - private final static int DEFAULT_BUFFER_SIZE = 65535; - public static int SYNC_INTERVAL; - - public static class RowFileScanner extends FileScanner { - private FileSystem fs; - private FSDataInputStream in; - private Tuple tuple; - - private byte[] sync = new byte[SYNC_HASH_SIZE]; - private byte[] checkSync = new byte[SYNC_HASH_SIZE]; - private long start, end; - - private ByteBuffer buffer; - private final int tupleHeaderSize; - private BitArray nullFlags; - private long bufferStartPos; - - public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) - throws IOException { - super(conf, schema, meta, fragment); - - SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname, - ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal) * SYNC_SIZE; - - nullFlags = new BitArray(schema.size()); - tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8); - this.start = this.fragment.getStartKey(); - this.end = this.start + this.fragment.getLength(); - } - - public void init() throws IOException { - // set default page size. - fs = fragment.getPath().getFileSystem(conf); - in = fs.open(fragment.getPath()); - buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * schema.size()); - buffer.flip(); - - readHeader(); - - // find the correct position from the start - if (this.start > in.getPos()) { - long realStart = start > SYNC_SIZE ? (start-SYNC_SIZE) : 0; - in.seek(realStart); - } - bufferStartPos = in.getPos(); - fillBuffer(); - - if (start != 0) { - // TODO: improve - boolean syncFound = false; - while (!syncFound) { - if (buffer.remaining() < SYNC_SIZE) { - fillBuffer(); - } - buffer.mark(); - syncFound = checkSync(); - if (!syncFound) { - buffer.reset(); - buffer.get(); // proceed one byte - } - } - bufferStartPos += buffer.position(); - buffer.compact(); - buffer.flip(); - } - - tuple = new VTuple(schema.size()); - - super.init(); - } - - private void readHeader() throws IOException { - SYNC_INTERVAL = in.readInt(); - StorageUtil.readFully(in, this.sync, 0, SYNC_HASH_SIZE); - } - - /** - * Find the sync from the front of the buffer - * - * @return return true if it succeeds to find the sync. - * @throws java.io.IOException - */ - private boolean checkSync() throws IOException { - buffer.getInt(); // escape - buffer.get(checkSync, 0, SYNC_HASH_SIZE); // sync - return Arrays.equals(checkSync, sync); - } - - private int fillBuffer() throws IOException { - bufferStartPos += buffer.position(); - buffer.compact(); - int remain = buffer.remaining(); - int read = in.read(buffer); - if (read == -1) { - buffer.flip(); - return read; - } else { - int totalRead = read; - if (remain > totalRead) { - read = in.read(buffer); - totalRead += read > 0 ? read : 0; - } - buffer.flip(); - return totalRead; - } - } - - @Override - public Tuple next() throws IOException { - while (buffer.remaining() < SYNC_SIZE) { - if (fillBuffer() < 0) { - return null; - } - } - - buffer.mark(); - if (!checkSync()) { - buffer.reset(); - } else { - if (bufferStartPos + buffer.position() > end) { - return null; - } - } - - while (buffer.remaining() < tupleHeaderSize) { - if (fillBuffer() < 0) { - return null; - } - } - - int i; - - int nullFlagSize = buffer.getShort(); - byte[] nullFlagBytes = new byte[nullFlagSize]; - buffer.get(nullFlagBytes, 0, nullFlagSize); - nullFlags = new BitArray(nullFlagBytes); - int tupleSize = buffer.getShort(); - - while (buffer.remaining() < (tupleSize)) { - if (fillBuffer() < 0) { - return null; - } - } - - Datum datum; - Column col; - for (i = 0; i < schema.size(); i++) { - if (!nullFlags.get(i)) { - col = schema.getColumn(i); - switch (col.getDataType().getType()) { - case BOOLEAN : - datum = DatumFactory.createBool(buffer.get()); - tuple.put(i, datum); - break; - - case BIT: - datum = DatumFactory.createBit(buffer.get()); - tuple.put(i, datum ); - break; - - case CHAR : - int realLen = buffer.getInt(); - byte[] buf = new byte[col.getDataType().getLength()]; - buffer.get(buf); - byte[] charBuf = Arrays.copyOf(buf, realLen); - tuple.put(i, DatumFactory.createChar(charBuf)); - break; - - case INT2 : - datum = DatumFactory.createInt2(buffer.getShort()); - tuple.put(i, datum ); - break; - - case INT4 : - datum = DatumFactory.createInt4(buffer.getInt()); - tuple.put(i, datum ); - break; - - case INT8 : - datum = DatumFactory.createInt8(buffer.getLong()); - tuple.put(i, datum ); - break; - - case FLOAT4 : - datum = DatumFactory.createFloat4(buffer.getFloat()); - tuple.put(i, datum); - break; - - case FLOAT8 : - datum = DatumFactory.createFloat8(buffer.getDouble()); - tuple.put(i, datum); - break; - - case TEXT: - short bytelen = buffer.getShort(); - byte[] strbytes = new byte[bytelen]; - buffer.get(strbytes, 0, bytelen); - datum = DatumFactory.createText(strbytes); - tuple.put(i, datum); - break; - - case BLOB: - short bytesLen = buffer.getShort(); - byte [] bytesBuf = new byte[bytesLen]; - buffer.get(bytesBuf); - datum = DatumFactory.createBlob(bytesBuf); - tuple.put(i, datum); - break; - - default: - break; - } - } else { - tuple.put(i, DatumFactory.createNullDatum()); - } - } - return tuple; - } - - @Override - public void reset() throws IOException { - init(); - } - - @Override - public void close() throws IOException { - if (in != null) { - in.close(); - } - } - - @Override - public boolean isProjectable() { - return false; - } - - @Override - public boolean isSelectable() { - return false; - } - - @Override - public void setFilter(EvalNode filter) { - throw new TajoRuntimeException(new UnsupportedException()); - } - - @Override - public boolean isSplittable(){ - return true; - } - } - - public static class RowFileAppender extends FileAppender { - private FSDataOutputStream out; - private long lastSyncPos; - private FileSystem fs; - private byte[] sync; - private ByteBuffer buffer; - - private BitArray nullFlags; - // statistics - private TableStatistics stats; - private ShuffleType shuffleType; - - public RowFileAppender(Configuration conf, final TaskAttemptId taskAttemptId, - final Schema schema, final TableMeta meta, final Path workDir) - throws IOException { - super(conf, taskAttemptId, schema, meta, workDir); - } - - public void init() throws IOException { - SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname, - ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal); - fs = path.getFileSystem(conf); - - if (!fs.exists(path.getParent())) { - throw new FileNotFoundException(path.toString()); - } - - if (fs.exists(path)) { - throw new AlreadyExistsStorageException(path); - } - - sync = new byte[SYNC_HASH_SIZE]; - lastSyncPos = 0; - - out = fs.create(path); - - MessageDigest md; - try { - md = MessageDigest.getInstance("MD5"); - md.update((path.toString()+System.currentTimeMillis()).getBytes()); - sync = md.digest(); - } catch (NoSuchAlgorithmException e) { - LOG.error(e); - } - - writeHeader(); - - buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); - - nullFlags = new BitArray(schema.size()); - - if (tableStatsEnabled) { - this.stats = new TableStatistics(this.schema, columnStatsEnabled); - this.shuffleType = PlannerUtil.getShuffleType( - meta.getProperty(StorageConstants.SHUFFLE_TYPE, - PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE))); - } - } - - private void writeHeader() throws IOException { - out.writeInt(SYNC_INTERVAL); - out.write(sync); - out.flush(); - lastSyncPos = out.getPos(); - } - - @Override - public void addTuple(Tuple t) throws IOException { - checkAndWriteSync(); - Column col; - - buffer.clear(); - nullFlags.clear(); - - for (int i = 0; i < schema.size(); i++) { - if (shuffleType == ShuffleType.RANGE_SHUFFLE) { - // it is to calculate min/max values, and it is only used for the intermediate file. - stats.analyzeField(i, t); - } - - if (t.isBlankOrNull(i)) { - nullFlags.set(i); - } else { - col = schema.getColumn(i); - switch (col.getDataType().getType()) { - case BOOLEAN: - buffer.put(t.getByte(i)); - break; - case BIT: - buffer.put(t.getByte(i)); - break; - case CHAR: - byte[] src = t.getBytes(i); - byte[] dst = Arrays.copyOf(src, col.getDataType().getLength()); - buffer.putInt(src.length); - buffer.put(dst); - break; - case TEXT: - byte [] strbytes = t.getBytes(i); - buffer.putShort((short)strbytes.length); - buffer.put(strbytes, 0, strbytes.length); - break; - case INT2: - buffer.putShort(t.getInt2(i)); - break; - case INT4: - buffer.putInt(t.getInt4(i)); - break; - case INT8: - buffer.putLong(t.getInt8(i)); - break; - case FLOAT4: - buffer.putFloat(t.getFloat4(i)); - break; - case FLOAT8: - buffer.putDouble(t.getFloat8(i)); - break; - case BLOB: - byte [] bytes = t.getBytes(i); - buffer.putShort((short)bytes.length); - buffer.put(bytes); - break; - case NULL_TYPE: - nullFlags.set(i); - break; - default: - break; - } - } - } - - byte[] bytes = nullFlags.toArray(); - out.writeShort(bytes.length); - out.write(bytes); - - bytes = buffer.array(); - int dataLen = buffer.position(); - out.writeShort(dataLen); - out.write(bytes, 0, dataLen); - - // Statistical section - if (tableStatsEnabled) { - stats.incrementRow(); - } - } - - @Override - public long getOffset() throws IOException { - return out.getPos(); - } - - @Override - public void flush() throws IOException { - out.flush(); - } - - @Override - public void close() throws IOException { - if (out != null) { - if (tableStatsEnabled) { - stats.setNumBytes(out.getPos()); - } - sync(); - out.flush(); - IOUtils.cleanup(LOG, out); - } - } - - private void sync() throws IOException { - if (lastSyncPos != out.getPos()) { - out.writeInt(SYNC_ESCAPE); - out.write(sync); - lastSyncPos = out.getPos(); - } - } - - private void checkAndWriteSync() throws IOException { - if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) { - sync(); - } - } - - @Override - public TableStats getStats() { - if (tableStatsEnabled) { - return stats.getTableStat(); - } else { - return null; - } - } - } -} diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java index 8f18c7a0f0..7bdf0cbee3 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java @@ -19,161 +19,61 @@ package org.apache.tajo.storage.fragment; import com.google.common.base.Objects; -import com.google.gson.annotations.Expose; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; -import org.apache.tajo.BuiltinStorages; -import org.apache.tajo.storage.StorageFragmentProtos.FileFragmentProto; +import org.apache.tajo.storage.DataLocation; import org.apache.tajo.util.TUtil; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; - -public class FileFragment implements Fragment, Comparable, Cloneable { - @Expose private String tableName; // required - @Expose private Path uri; // required - @Expose public Long startOffset; // required - @Expose public Long length; // required - - private String[] hosts; // Datanode hostnames - @Expose private int[] diskIds; - - public FileFragment(ByteString raw) throws InvalidProtocolBufferException { - FileFragmentProto.Builder builder = FileFragmentProto.newBuilder(); - builder.mergeFrom(raw); - builder.build(); - init(builder.build()); - } +/** + * Fragment for file systems. + */ +public class FileFragment extends Fragment { + private Integer[] diskIds; // disk volume ids public FileFragment(String tableName, Path uri, BlockLocation blockLocation) throws IOException { - this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null); + this(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null); } - public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, int[] diskIds) { - this.set(tableName, uri, start, length, hosts, diskIds); + public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, Integer[] diskIds) { + super(BuiltinFragmentKinds.FILE, uri.toUri(), tableName, start, start + length, length, hosts); + this.diskIds = diskIds; } + // Non splittable public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) { - this.set(tableName, uri, start, length, hosts, null); + this(tableName, uri, start, length, hosts, null); } public FileFragment(String fragmentId, Path path, long start, long length) { - this.set(fragmentId, path, start, length, null, null); - } - - public FileFragment(FileFragmentProto proto) { - init(proto); - } - - private void init(FileFragmentProto proto) { - int[] diskIds = new int[proto.getDiskIdsList().size()]; - int i = 0; - for(Integer eachValue: proto.getDiskIdsList()) { - diskIds[i++] = eachValue; - } - List var = proto.getHostsList(); - this.set(proto.getId(), new Path(proto.getPath()), - proto.getStartOffset(), proto.getLength(), - var.toArray(new String[var.size()]), - diskIds); - } - - private void set(String tableName, Path path, long start, - long length, String[] hosts, int[] diskIds) { - this.tableName = tableName; - this.uri = path; - this.startOffset = start; - this.length = length; - this.hosts = hosts; - this.diskIds = diskIds; - } - - - /** - * Get the list of hosts (hostname) hosting this block - */ - public String[] getHosts() { - if (hosts == null) { - this.hosts = new String[0]; - } - return hosts; + this(fragmentId, path, start, length, null, null); } /** * Get the list of Disk Ids * Unknown disk is -1. Others 0 ~ N */ - public int[] getDiskIds() { + public Integer[] getDiskIds() { if (diskIds == null) { - this.diskIds = new int[getHosts().length]; - Arrays.fill(this.diskIds, -1); + this.diskIds = new Integer[getHostNames().size()]; + Arrays.fill(this.diskIds, DataLocation.UNKNOWN_VOLUME_ID); } return diskIds; } - public void setDiskIds(int[] diskIds){ + public void setDiskIds(Integer[] diskIds){ this.diskIds = diskIds; } - @Override - public String getTableName() { - return this.tableName; - } - public Path getPath() { - return this.uri; + return new Path(uri); } public void setPath(Path path) { - this.uri = path; - } - - public Long getStartKey() { - return this.startOffset; - } - - @Override - public String getKey() { - return this.uri.toString(); - } - - @Override - public long getLength() { - return this.length; - } - - @Override - public boolean isEmpty() { - return this.length <= 0; - } - /** - * - * The offset range of tablets MUST NOT be overlapped. - * - * @param t - * @return If the table paths are not same, return -1. - */ - @Override - public int compareTo(FileFragment t) { - if (getPath().equals(t.getPath())) { - long diff = this.getStartKey() - t.getStartKey(); - if (diff < 0) { - return -1; - } else if (diff > 0) { - return 1; - } else { - return 0; - } - } else { - return -1; - } + this.uri = path.toUri(); } @Override @@ -191,48 +91,20 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hashCode(tableName, uri, startOffset, length); + return Objects.hashCode(inputSourceId, uri, startKey, endKey, length, diskIds, hostNames); } - + + @Override public Object clone() throws CloneNotSupportedException { FileFragment frag = (FileFragment) super.clone(); - frag.tableName = tableName; - frag.uri = uri; frag.diskIds = diskIds; - frag.hosts = hosts; - return frag; } @Override public String toString() { - return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": " + return "\"fragment\": {\"id\": \""+ inputSourceId +"\", \"path\": " +getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": " + getLength() + "}" ; } - - public FragmentProto getProto() { - FileFragmentProto.Builder builder = FileFragmentProto.newBuilder(); - builder.setId(this.tableName); - builder.setStartOffset(this.startOffset); - builder.setLength(this.length); - builder.setPath(this.uri.toString()); - if(diskIds != null) { - List idList = new ArrayList<>(); - for(int eachId: diskIds) { - idList.add(eachId); - } - builder.addAllDiskIds(idList); - } - - if(hosts != null) { - builder.addAllHosts(Arrays.asList(hosts)); - } - - FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder(); - fragmentBuilder.setId(this.tableName); - fragmentBuilder.setDataFormat(BuiltinStorages.TEXT); - fragmentBuilder.setContents(builder.buildPartial().toByteString()); - return fragmentBuilder.build(); - } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragmentSerdeHelper.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragmentSerdeHelper.java new file mode 100644 index 0000000000..5633ac387f --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragmentSerdeHelper.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.fragment; + +import com.google.protobuf.GeneratedMessage.Builder; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.storage.StorageFragmentProtos.FileFragmentProto; + +import java.util.ArrayList; +import java.util.List; + +public class FileFragmentSerdeHelper implements FragmentSerdeHelper { + + @Override + public Builder newBuilder() { + return FileFragmentProto.newBuilder(); + } + + @Override + public FileFragmentProto serialize(FileFragment fragment) { + FileFragmentProto.Builder builder = FileFragmentProto.newBuilder(); + builder.setId(fragment.inputSourceId); + builder.setStartOffset(fragment.startKey); + builder.setLength(fragment.length); + builder.setPath(fragment.getPath().toString()); + if(fragment.getDiskIds() != null) { + List idList = new ArrayList<>(); + for(int eachId: fragment.getDiskIds()) { + idList.add(eachId); + } + builder.addAllDiskIds(idList); + } + + if(fragment.hostNames != null) { + builder.addAllHosts(fragment.hostNames); + } + return builder.build(); + } + + @Override + public FileFragment deserialize(FileFragmentProto proto) { + return new FileFragment( + proto.getId(), + new Path(proto.getPath()), + proto.getStartOffset(), + proto.getLength(), + proto.getHostsList().toArray(new String[proto.getHostsCount()]), + proto.getDiskIdsList().toArray(new Integer[proto.getDiskIdsCount()])); + } +} diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java index 964a58ab53..a2688b1213 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java @@ -88,7 +88,7 @@ public void init() throws IOException { } pos = startOffset = fragment.getStartKey(); - end = startOffset + fragment.getLength(); + end = fragment.getEndKey(); if (codec != null) { fis = fs.open(fragment.getPath()); diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index e112b0daf5..d52f46da69 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -291,7 +291,7 @@ public DelimitedTextFileScanner(Configuration conf, final Schema schema, final T } startOffset = this.fragment.getStartKey(); - endOffset = startOffset + fragment.getLength(); + endOffset = this.fragment.getEndKey(); errorTorrenceMaxNum = Integer.parseInt(meta.getProperty(TEXT_ERROR_TOLERANCE_MAXNUM, DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM)); diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java index 58fea2bf94..a29e86bdd0 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java @@ -99,7 +99,7 @@ public OrcRecordReader(List stripes, long rows = 0; long skippedRows = 0; long offset = fragment.getStartKey(); - long maxOffset = fragment.getStartKey() + fragment.getLength(); + long maxOffset = fragment.getEndKey(); for(StripeInformation stripe: stripes) { long stripeStart = stripe.getOffset(); if (offset > stripeStart) { diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java index dc8781ef38..e6e8cb5dd1 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java @@ -159,14 +159,14 @@ public void testGetSplit() throws Exception { splits.addAll(space.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()]))); assertEquals(testCount, splits.size()); // -1 is unknown volumeId - assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); + assertEquals(DataLocation.UNKNOWN_VOLUME_ID, ((FileFragment)splits.get(0)).getDiskIds()[0].intValue()); splits.clear(); splits.addAll(space.getSplits("data", meta, schema, partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2]))); assertEquals(testCount / 2, splits.size()); - assertEquals(1, splits.get(0).getHosts().length); - assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); + assertEquals(1, splits.get(0).getHostNames().size()); + assertEquals(DataLocation.UNKNOWN_VOLUME_ID, ((FileFragment)splits.get(0)).getDiskIds()[0].intValue()); fs.close(); } finally { @@ -259,9 +259,9 @@ public void testGetSplitWithBlockStorageLocationsBatching() throws Exception { splits.addAll(sm.getSplits("data", meta, schema, tablePath)); assertEquals(testCount, splits.size()); - assertEquals(2, splits.get(0).getHosts().length); + assertEquals(2, splits.get(0).getHostNames().size()); assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length); - assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); + assertNotEquals(DataLocation.UNKNOWN_VOLUME_ID, ((FileFragment)splits.get(0)).getDiskIds()[0].intValue()); fs.close(); } finally { diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java index 0e6fde5bdd..7a9b074176 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.tajo.BuiltinStorages; import org.apache.tajo.catalog.*; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; @@ -74,8 +75,8 @@ public TestBSTIndex(String type) { @Parameters(name = "{index}: {0}") public static Collection generateParameters() { return Arrays.asList(new Object[][]{ - {"RAW"}, - {"TEXT"} + {BuiltinStorages.RAW}, + {BuiltinStorages.TEXT} }); } @@ -128,7 +129,7 @@ public void testFindValue() throws IOException { creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); Tuple keyTuple; @@ -152,7 +153,7 @@ public void testFindValue() throws IOException { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp); reader.init(); scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); for (int i = 0; i < TUPLE_NUM - 1; i++) { @@ -233,7 +234,7 @@ public void testBuildIndexWithAppender() throws IOException { keySchema, comp); reader.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); for (int i = 0; i < TUPLE_NUM - 1; i++) { @@ -298,7 +299,7 @@ public void testFindOmittedValue() throws IOException { creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); Tuple keyTuple; @@ -372,7 +373,7 @@ public void testFindNextKeyValue() throws IOException { creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); Tuple keyTuple; @@ -396,7 +397,7 @@ public void testFindNextKeyValue() throws IOException { keySchema, comp); reader.init(); scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); Tuple result; @@ -466,7 +467,7 @@ public void testFindNextKeyOmittedValue() throws IOException { creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); Tuple keyTuple; @@ -490,7 +491,7 @@ public void testFindNextKeyOmittedValue() throws IOException { keySchema, comp); reader.init(); scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); Tuple result; @@ -549,7 +550,7 @@ public void testFindMinValue() throws IOException { creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); Tuple keyTuple; @@ -575,7 +576,7 @@ public void testFindMinValue() throws IOException { keySchema, comp); reader.init(); scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); tuple.put(0, DatumFactory.createInt8(0)); @@ -636,7 +637,7 @@ public void testMinMax() throws IOException { creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); Tuple keyTuple; @@ -744,7 +745,7 @@ public void testConcurrentAccess() throws IOException, InterruptedException { creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); Tuple keyTuple; @@ -828,7 +829,7 @@ public void testFindValueDescOrder() throws IOException { creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); Tuple keyTuple; @@ -854,7 +855,7 @@ public void testFindValueDescOrder() throws IOException { keySchema, comp); reader.init(); scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); for (int i = (TUPLE_NUM - 1); i > 0; i--) { @@ -921,7 +922,7 @@ public void testFindNextKeyValueDescOrder() throws IOException { creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); Tuple keyTuple; @@ -950,7 +951,7 @@ public void testFindNextKeyValueDescOrder() throws IOException { assertEquals(comp, reader.getComparator()); scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); Tuple result; @@ -1023,7 +1024,7 @@ public void testFindValueASCOrder() throws IOException { creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); Tuple keyTuple; @@ -1047,7 +1048,7 @@ public void testFindValueASCOrder() throws IOException { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp); reader.init(); scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). - getSeekableScanner(meta, schema, tablet.getProto(), schema); + getSeekableScanner(meta, schema, tablet, schema); scanner.init(); for (int i = 0; i < TUPLE_NUM - 1; i++) { diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java index 4f6b566e86..cca7920a67 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java @@ -114,7 +114,7 @@ public void testFindValueInSingleCSV() throws IOException { creater.init(); SeekableScanner fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()) - .getSeekableScanner(meta, schema, tablet.getProto(), schema); + .getSeekableScanner(meta, schema, tablet, schema); fileScanner.init(); Tuple keyTuple; long offset; @@ -139,7 +139,7 @@ public void testFindValueInSingleCSV() throws IOException { "FindValueInCSV.idx"), keySchema, comp); reader.init(); fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()) - .getSeekableScanner(meta, schema, tablet.getProto(), schema); + .getSeekableScanner(meta, schema, tablet, schema); fileScanner.init(); for (int i = 0; i < TUPLE_NUM - 1; i++) { tuple.put(0, DatumFactory.createInt8(i)); @@ -206,7 +206,7 @@ public void testFindNextKeyValueInSingleCSV() throws IOException { creater.init(); SeekableScanner fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()) - .getSeekableScanner(meta, schema, tablet.getProto(), schema); + .getSeekableScanner(meta, schema, tablet, schema); fileScanner.init(); Tuple keyTuple; long offset; @@ -228,7 +228,7 @@ public void testFindNextKeyValueInSingleCSV() throws IOException { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp); reader.init(); fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()) - .getSeekableScanner(meta, schema, tablet.getProto(), schema); + .getSeekableScanner(meta, schema, tablet, schema); fileScanner.init(); Tuple result; for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) { diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml index 3283f9faab..16317aead0 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml @@ -39,44 +39,28 @@ - tajo.storage.fragment.text.class + tajo.storage.fragment.kind.file org.apache.tajo.storage.fragment.FileFragment - tajo.storage.fragment.json.class - org.apache.tajo.storage.fragment.FileFragment + tajo.storage.fragment.kind.hbase + org.apache.tajo.storage.hbase.HBaseFragment - tajo.storage.fragment.raw.class - org.apache.tajo.storage.fragment.FileFragment + tajo.storage.fragment.kind.jdbc + org.apache.tajo.storage.jdbc.JdbcFragment - tajo.storage.fragment.draw.class - org.apache.tajo.storage.fragment.FileFragment + tajo.storage.fragment.serde-helper.file + org.apache.tajo.storage.fragment.FileFragmentSerdeHelper - tajo.storage.fragment.rcfile.class - org.apache.tajo.storage.fragment.FileFragment + tajo.storage.fragment.serde-helper.hbase + org.apache.tajo.storage.hbase.HBaseFragmentSerdeHelper - tajo.storage.fragment.row.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.parquet.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.orc.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.sequencefile.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.avro.class - org.apache.tajo.storage.fragment.FileFragment + tajo.storage.fragment.serde-helper.jdbc + org.apache.tajo.storage.jdbc.JdbcFragmentSerdeHelper diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java index 0088504c5a..20c1acad5c 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java @@ -18,90 +18,34 @@ package org.apache.tajo.storage.jdbc; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.storage.fragment.BuiltinFragmentKinds; import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.jdbc.JdbcFragmentProtos.JdbcFragmentProto; -import java.util.Arrays; +import java.net.URI; +import java.util.List; -public class JdbcFragment implements Fragment, Comparable, Cloneable { - String uri; - String inputSourceId; - String [] hostNames; - - - public JdbcFragment(ByteString raw) throws InvalidProtocolBufferException { - JdbcFragmentProto.Builder builder = JdbcFragmentProto.newBuilder(); - builder.mergeFrom(raw); - builder.build(); - init(builder.build()); - } +/** + * Fragment for the systems which connects to Tajo via the JDBC interface. + */ +public class JdbcFragment extends Fragment { - public JdbcFragment(String inputSourceId, String uri) { - this.inputSourceId = inputSourceId; - this.uri = uri; - this.hostNames = extractHosts(uri); + // TODO: set start and end keys properly + public JdbcFragment(String inputSourceId, URI uri) { + super(BuiltinFragmentKinds.JDBC, uri, inputSourceId, null, null, TajoConstants.UNKNOWN_LENGTH, extractHosts(uri)); } - private void init(JdbcFragmentProto proto) { - this.uri = proto.getUri(); - this.inputSourceId = proto.getInputSourceId(); - this.hostNames = proto.getHostsList().toArray(new String [proto.getHostsCount()]); + public JdbcFragment(String inputSourceId, URI uri, List hostNames) { + super(BuiltinFragmentKinds.JDBC, uri, inputSourceId, null, null, TajoConstants.UNKNOWN_LENGTH, + hostNames.toArray(new String[hostNames.size()])); } - private String [] extractHosts(String uri) { + private static String[] extractHosts(URI uri) { return new String[] {ConnectionInfo.fromURI(uri).host}; } - @Override - public String getTableName() { - return inputSourceId; - } - - public String getUri() { - return uri; - } - - @Override - public long getLength() { - return 0; - } - - @Override - public String getKey() { - return null; - } - - @Override - public String[] getHosts() { - return hostNames; - } - @Override public boolean isEmpty() { return false; } - - @Override - public CatalogProtos.FragmentProto getProto() { - JdbcFragmentProto.Builder builder = JdbcFragmentProto.newBuilder(); - builder.setInputSourceId(this.inputSourceId); - builder.setUri(this.uri); - if(hostNames != null) { - builder.addAllHosts(Arrays.asList(hostNames)); - } - - CatalogProtos.FragmentProto.Builder fragmentBuilder = CatalogProtos.FragmentProto.newBuilder(); - fragmentBuilder.setId(this.inputSourceId); - fragmentBuilder.setDataFormat("JDBC"); - fragmentBuilder.setContents(builder.buildPartial().toByteString()); - return fragmentBuilder.build(); - } - - @Override - public int compareTo(JdbcFragment o) { - return this.uri.compareTo(o.uri); - } } diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragmentSerdeHelper.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragmentSerdeHelper.java new file mode 100644 index 0000000000..784bb78809 --- /dev/null +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragmentSerdeHelper.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.jdbc; + +import com.google.protobuf.GeneratedMessage.Builder; +import org.apache.tajo.storage.fragment.FragmentSerdeHelper; +import org.apache.tajo.storage.jdbc.JdbcFragmentProtos.JdbcFragmentProto; + +import java.net.URI; + +public class JdbcFragmentSerdeHelper implements FragmentSerdeHelper { + + @Override + public Builder newBuilder() { + return JdbcFragmentProto.newBuilder(); + } + + @Override + public JdbcFragmentProto serialize(JdbcFragment fragment) { + return JdbcFragmentProto.newBuilder() + .setInputSourceId(fragment.getInputSourceId()) + .setUri(fragment.getUri().toASCIIString()) + .addAllHosts(fragment.getHostNames()) + .build(); + } + + @Override + public JdbcFragment deserialize(JdbcFragmentProto proto) { + return new JdbcFragment(proto.getInputSourceId(), URI.create(proto.getUri()), proto.getHostsList()); + } +} diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java index d7e1c3b926..2a16060bda 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java @@ -255,7 +255,7 @@ protected void convertTuple(ResultSet resultSet, VTuple tuple) { private ResultSetIterator executeQueryAndGetIter() { try { LOG.info("Generated SQL: " + generatedSql); - Connection conn = DriverManager.getConnection(fragment.uri, connProperties); + Connection conn = DriverManager.getConnection(fragment.getUri().toASCIIString(), connProperties); Statement statement = conn.createStatement(); ResultSet resultset = statement.executeQuery(generatedSql); return new ResultSetIterator((resultset)); diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java index fa6cf486e2..a2478ecd3b 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java @@ -123,7 +123,7 @@ public URI getTableUri(String databaseName, String tableName) { public List getSplits(String inputSourceId, TableDesc tableDesc, @Nullable EvalNode filterCondition) throws IOException { - return Lists.newArrayList((Fragment)new JdbcFragment(inputSourceId, tableDesc.getUri().toASCIIString())); + return Lists.newArrayList((Fragment)new JdbcFragment(inputSourceId, tableDesc.getUri())); } @Override