diff --git a/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java b/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java deleted file mode 100644 index e3fc3f7314..0000000000 --- a/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.cascading; - -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.Type; - -import cascading.tuple.Fields; - -import java.util.List; -import java.util.ArrayList; - -public class SchemaIntersection { - - private final MessageType requestedSchema; - private final Fields sourceFields; - - public SchemaIntersection(MessageType fileSchema, Fields requestedFields) { - if(requestedFields == Fields.UNKNOWN) - requestedFields = Fields.ALL; - - Fields newFields = Fields.NONE; - List newSchemaFields = new ArrayList(); - int schemaSize = fileSchema.getFieldCount(); - - for (int i = 0; i < schemaSize; i++) { - Type type = fileSchema.getType(i); - Fields name = new Fields(type.getName()); - - if(requestedFields.contains(name)) { - newFields = newFields.append(name); - newSchemaFields.add(type); - } - } - - this.sourceFields = newFields; - this.requestedSchema = new MessageType(fileSchema.getName(), newSchemaFields); - } - - public MessageType getRequestedSchema() { - return requestedSchema; - } - - public Fields getSourceFields() { - return sourceFields; - } -} diff --git a/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java b/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java deleted file mode 100644 index 7b3fa0ee7b..0000000000 --- a/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.cascading; - -import java.util.Map; -import java.util.StringJoiner; -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.JobConf; - -import cascading.tuple.Tuple; -import cascading.tuple.Fields; -import cascading.flow.hadoop.util.HadoopUtil; - -import org.apache.parquet.hadoop.api.ReadSupport; -import org.apache.parquet.io.api.RecordMaterializer; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.cascading.convert.TupleRecordMaterializer; - - -public class TupleReadSupport extends ReadSupport { - static final String PARQUET_CASCADING_REQUESTED_FIELDS = "parquet.cascading.requested.fields"; - - static protected Fields getRequestedFields(Configuration configuration) { - String fieldsString = configuration.get(PARQUET_CASCADING_REQUESTED_FIELDS); - - if(fieldsString == null) - return Fields.ALL; - - String[] parts = fieldsString.split(":"); - if(parts.length == 0) - return Fields.ALL; - else - return new Fields(parts); - } - - static protected void setRequestedFields(JobConf configuration, Fields fields) { - StringJoiner joiner = new StringJoiner(":"); - fields.forEach(f -> joiner.add(f.toString())); - configuration.set(PARQUET_CASCADING_REQUESTED_FIELDS, joiner.toString()); - } - - @Override - public ReadContext init(Configuration configuration, Map keyValueMetaData, MessageType fileSchema) { - Fields requestedFields = getRequestedFields(configuration); - if (requestedFields == null) { - return new ReadContext(fileSchema); - } else { - SchemaIntersection intersection = new SchemaIntersection(fileSchema, requestedFields); - return new ReadContext(intersection.getRequestedSchema()); - } - } - - @Override - public RecordMaterializer prepareForRead( - Configuration configuration, - Map keyValueMetaData, - MessageType fileSchema, - ReadContext readContext) { - MessageType requestedSchema = readContext.getRequestedSchema(); - return new TupleRecordMaterializer(requestedSchema); - } - -} diff --git a/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java b/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java deleted file mode 100644 index 0e5f13e448..0000000000 --- a/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.cascading; - -import cascading.tuple.TupleEntry; -import java.util.HashMap; -import java.util.List; -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.hadoop.api.WriteSupport; -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; - -public class TupleWriteSupport extends WriteSupport { - - private RecordConsumer recordConsumer; - private MessageType rootSchema; - public static final String PARQUET_CASCADING_SCHEMA = "parquet.cascading.schema"; - - @Override - public String getName() { - return "cascading"; - } - - @Override - public WriteContext init(Configuration configuration) { - String schema = configuration.get(PARQUET_CASCADING_SCHEMA); - rootSchema = MessageTypeParser.parseMessageType(schema); - return new WriteContext(rootSchema, new HashMap()); - } - - @Override - public void prepareForWrite(RecordConsumer recordConsumer) { - this.recordConsumer = recordConsumer; - } - - @Override - public void write(TupleEntry record) { - recordConsumer.startMessage(); - final List fields = rootSchema.getFields(); - - for (int i = 0; i < fields.size(); i++) { - Type field = fields.get(i); - - if (record == null || record.getObject(field.getName()) == null) { - continue; - } - recordConsumer.startField(field.getName(), i); - if (field.isPrimitive()) { - writePrimitive(record, field.asPrimitiveType()); - } else { - throw new UnsupportedOperationException("Complex type not implemented"); - } - recordConsumer.endField(field.getName(), i); - } - recordConsumer.endMessage(); - } - - private void writePrimitive(TupleEntry record, PrimitiveType field) { - switch (field.getPrimitiveTypeName()) { - case BINARY: - recordConsumer.addBinary(Binary.fromString(record.getString(field.getName()))); - break; - case BOOLEAN: - recordConsumer.addBoolean(record.getBoolean(field.getName())); - break; - case INT32: - recordConsumer.addInteger(record.getInteger(field.getName())); - break; - case INT64: - recordConsumer.addLong(record.getLong(field.getName())); - break; - case DOUBLE: - recordConsumer.addDouble(record.getDouble(field.getName())); - break; - case FLOAT: - recordConsumer.addFloat(record.getFloat(field.getName())); - break; - case FIXED_LEN_BYTE_ARRAY: - throw new UnsupportedOperationException("Fixed len byte array type not implemented"); - case INT96: - throw new UnsupportedOperationException("Int96 type not implemented"); - default: - throw new UnsupportedOperationException(field.getName() + " type not implemented"); - } - } -} diff --git a/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java b/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java deleted file mode 100644 index 4c1240b859..0000000000 --- a/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.cascading.convert; - -import cascading.tuple.Tuple; - -import org.apache.parquet.io.ParquetDecodingException; -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.io.api.Converter; -import org.apache.parquet.io.api.GroupConverter; -import org.apache.parquet.io.api.PrimitiveConverter; -import org.apache.parquet.pig.TupleConversionException; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.Type; - -public class TupleConverter extends GroupConverter { - - protected Tuple currentTuple; - private final Converter[] converters; - - public TupleConverter(GroupType parquetSchema) { - int schemaSize = parquetSchema.getFieldCount(); - - this.converters = new Converter[schemaSize]; - for (int i = 0; i < schemaSize; i++) { - Type type = parquetSchema.getType(i); - converters[i] = newConverter(type, i); - } - } - - private Converter newConverter(Type type, int i) { - if(!type.isPrimitive()) { - throw new IllegalArgumentException("cascading can only build tuples from primitive types"); - } else { - return new TuplePrimitiveConverter(this, i); - } - } - - @Override - public Converter getConverter(int fieldIndex) { - return converters[fieldIndex]; - } - - @Override - final public void start() { - currentTuple = Tuple.size(converters.length); - } - - @Override - public void end() { - } - - final public Tuple getCurrentTuple() { - return currentTuple; - } - - static final class TuplePrimitiveConverter extends PrimitiveConverter { - private final TupleConverter parent; - private final int index; - - public TuplePrimitiveConverter(TupleConverter parent, int index) { - this.parent = parent; - this.index = index; - } - - @Override - public void addBinary(Binary value) { - parent.getCurrentTuple().setString(index, value.toStringUsingUTF8()); - } - - @Override - public void addBoolean(boolean value) { - parent.getCurrentTuple().setBoolean(index, value); - } - - @Override - public void addDouble(double value) { - parent.getCurrentTuple().setDouble(index, value); - } - - @Override - public void addFloat(float value) { - parent.getCurrentTuple().setFloat(index, value); - } - - @Override - public void addInt(int value) { - parent.getCurrentTuple().setInteger(index, value); - } - - @Override - public void addLong(long value) { - parent.getCurrentTuple().setLong(index, value); - } - } -} diff --git a/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java b/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java deleted file mode 100644 index 275e17b3b8..0000000000 --- a/parquet-cascading-common23-deprecated/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.cascading.convert; - -import cascading.tuple.Tuple; -import cascading.tuple.Fields; - -import org.apache.parquet.io.api.GroupConverter; -import org.apache.parquet.io.api.RecordMaterializer; -import org.apache.parquet.schema.GroupType; - -public class TupleRecordMaterializer extends RecordMaterializer { - - private TupleConverter root; - - public TupleRecordMaterializer(GroupType parquetSchema) { - this.root = new TupleConverter(parquetSchema); - } - - @Override - public Tuple getCurrentRecord() { - return root.getCurrentTuple(); - } - - @Override - public GroupConverter getRootConverter() { - return root; - } - -} diff --git a/parquet-cascading-common23-deprecated/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java b/parquet-cascading-common23-deprecated/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java deleted file mode 100644 index de350dd96c..0000000000 --- a/parquet-cascading-common23-deprecated/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.cascading; - -import cascading.flow.Flow; -import cascading.flow.FlowProcess; -import cascading.flow.hadoop.HadoopFlowConnector; -import cascading.operation.BaseOperation; -import cascading.operation.Function; -import cascading.operation.FunctionCall; -import cascading.pipe.Each; -import cascading.pipe.Pipe; -import cascading.scheme.Scheme; -import cascading.scheme.hadoop.TextLine; -import cascading.tap.Tap; -import cascading.tap.hadoop.Hfs; -import cascading.tuple.Fields; -import cascading.tuple.Tuple; -import cascading.tuple.TupleEntry; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.thrift.protocol.TCompactProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.transport.TIOStreamTransport; -import org.junit.Test; -import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter; -import org.apache.parquet.hadoop.util.ContextUtil; -import org.apache.parquet.thrift.test.Name; - -import java.io.ByteArrayOutputStream; -import java.io.File; - -import static org.junit.Assert.assertEquals; - -public class TestParquetTupleScheme { - final String parquetInputPath = "target/test/ParquetTupleIn/names-parquet-in"; - final String txtOutputPath = "target/test/ParquetTupleOut/names-txt-out"; - - @Test - public void testReadPattern() throws Exception { - String sourceFolder = parquetInputPath; - testReadWrite(sourceFolder); - - String sourceGlobPattern = parquetInputPath + "/*"; - testReadWrite(sourceGlobPattern); - - String multiLevelGlobPattern = "target/test/ParquetTupleIn/**/*"; - testReadWrite(multiLevelGlobPattern); - } - - @Test - public void testFieldProjection() throws Exception { - createFileForRead(); - - Path path = new Path(txtOutputPath); - final FileSystem fs = path.getFileSystem(new Configuration()); - if (fs.exists(path)) fs.delete(path, true); - - Scheme sourceScheme = new ParquetTupleScheme(new Fields("last_name")); - Tap source = new Hfs(sourceScheme, parquetInputPath); - - Scheme sinkScheme = new TextLine(new Fields("last_name")); - Tap sink = new Hfs(sinkScheme, txtOutputPath); - - Pipe assembly = new Pipe("namecp"); - assembly = new Each(assembly, new ProjectedTupleFunction()); - Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly); - - flow.complete(); - String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000")); - assertEquals("Practice\nHope\nHorse\n", result); - } - - public void testReadWrite(String inputPath) throws Exception { - createFileForRead(); - - Path path = new Path(txtOutputPath); - final FileSystem fs = path.getFileSystem(new Configuration()); - if (fs.exists(path)) fs.delete(path, true); - - Scheme sourceScheme = new ParquetTupleScheme(new Fields("first_name", "last_name")); - Tap source = new Hfs(sourceScheme, inputPath); - - Scheme sinkScheme = new TextLine(new Fields("first", "last")); - Tap sink = new Hfs(sinkScheme, txtOutputPath); - - Pipe assembly = new Pipe("namecp"); - assembly = new Each(assembly, new UnpackTupleFunction()); - Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly); - - flow.complete(); - String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000")); - assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result); - } - - private void createFileForRead() throws Exception { - final Path fileToCreate = new Path(parquetInputPath + "/names.parquet"); - - final Configuration conf = new Configuration(); - final FileSystem fs = fileToCreate.getFileSystem(conf); - if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true); - - TProtocolFactory protocolFactory = new TCompactProtocol.Factory(); - TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0); - ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class); - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos)); - - Name n1 = new Name(); - n1.setFirst_name("Alice"); - n1.setLast_name("Practice"); - Name n2 = new Name(); - n2.setFirst_name("Bob"); - n2.setLast_name("Hope"); - Name n3 = new Name(); - n3.setFirst_name("Charlie"); - n3.setLast_name("Horse"); - - n1.write(protocol); - w.write(new BytesWritable(baos.toByteArray())); - baos.reset(); - n2.write(protocol); - w.write(new BytesWritable(baos.toByteArray())); - baos.reset(); - n3.write(protocol); - w.write(new BytesWritable(baos.toByteArray())); - w.close(); - } - - private static class UnpackTupleFunction extends BaseOperation implements Function { - @Override - public void operate(FlowProcess flowProcess, FunctionCall functionCall) { - TupleEntry arguments = functionCall.getArguments(); - Tuple result = new Tuple(); - - Tuple name = new Tuple(); - name.addString(arguments.getString(0)); - name.addString(arguments.getString(1)); - - result.add(name); - functionCall.getOutputCollector().add(result); - } - } - - private static class ProjectedTupleFunction extends BaseOperation implements Function { - @Override - public void operate(FlowProcess flowProcess, FunctionCall functionCall) { - TupleEntry arguments = functionCall.getArguments(); - Tuple result = new Tuple(); - - Tuple name = new Tuple(); - name.addString(arguments.getString(0)); -// name.addString(arguments.getString(1)); - - result.add(name); - functionCall.getOutputCollector().add(result); - } - } -} diff --git a/parquet-cascading-common23-deprecated/src/test/resources/names.txt b/parquet-cascading-common23-deprecated/src/test/resources/names.txt deleted file mode 100644 index e2d0408c8f..0000000000 --- a/parquet-cascading-common23-deprecated/src/test/resources/names.txt +++ /dev/null @@ -1,3 +0,0 @@ -Alice Practive -Bob Hope -Charlie Horse diff --git a/parquet-cascading-common23-deprecated/src/test/thrift/test.thrift b/parquet-cascading-common23-deprecated/src/test/thrift/test.thrift deleted file mode 100644 index c58843dc2e..0000000000 --- a/parquet-cascading-common23-deprecated/src/test/thrift/test.thrift +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace java org.apache.parquet.thrift.test - -struct Name { - 1: required string first_name, - 2: optional string last_name -} diff --git a/parquet-cascading-deprecated/.cache b/parquet-cascading-deprecated/.cache deleted file mode 100644 index 916a4dd2ba..0000000000 Binary files a/parquet-cascading-deprecated/.cache and /dev/null differ diff --git a/parquet-cascading-deprecated/REVIEWERS.md b/parquet-cascading-deprecated/REVIEWERS.md deleted file mode 100644 index 7ea7747f8e..0000000000 --- a/parquet-cascading-deprecated/REVIEWERS.md +++ /dev/null @@ -1,23 +0,0 @@ - - -| Name | Apache Id | github id | -|--------------------|------------|-------------| -| Dmitriy Ryaboy | dvryaboy | dvryaboy | -| Tianshuo Deng | tianshuo | tsdeng | diff --git a/parquet-cascading-deprecated/pom.xml b/parquet-cascading-deprecated/pom.xml deleted file mode 100644 index 2ddfffc201..0000000000 --- a/parquet-cascading-deprecated/pom.xml +++ /dev/null @@ -1,184 +0,0 @@ - - - - org.apache.parquet - parquet - ../pom.xml - 1.13.0-SNAPSHOT - - - 4.0.0 - - parquet-cascading-deprecated - jar - - Apache Parquet Cascading [Deprecated, will be removed after 1.12] - https://parquet.apache.org - - - - conjars.org - https://conjars.org/repo - - - - - - org.apache.parquet - parquet-column - ${project.version} - - - org.apache.parquet - parquet-hadoop - ${project.version} - - - org.apache.parquet - parquet-thrift - ${project.version} - - - org.apache.thrift - libthrift - ${thrift.version} - provided - - - org.apache.hadoop - hadoop-client - ${hadoop.version} - provided - - - org.slf4j - slf4j-log4j12 - - - - - org.apache.parquet - parquet-column - ${project.version} - test-jar - test - - - org.mockito - mockito-all - ${mockito.version} - test - - - cascading - cascading-hadoop - ${cascading.version} - provided - - - org.slf4j - slf4j-api - ${slf4j.version} - test - - - org.slf4j - slf4j-simple - ${slf4j.version} - test - - - - - - - org.codehaus.mojo - build-helper-maven-plugin - 1.7 - - - add-source - generate-sources - - add-source - - - - ../parquet-cascading-common23-deprecated/src/main/java - - - - - add-test-source - generate-test-sources - - add-test-source - - - - ../parquet-cascading-common23-deprecated/src/test/java - - - - - add-test-resource - generate-test-resources - - add-test-resource - - - - - ../parquet-cascading-common23-deprecated/src/test/resources - - - - - - - - maven-enforcer-plugin - - - org.apache.maven.plugins - maven-jar-plugin - - - org.apache.thrift - thrift-maven-plugin - ${thrift-maven-plugin.version} - - ${thrift.executable} - ../parquet-cascading-common23-deprecated/src/main/thrift - ../parquet-cascading-common23-deprecated/src/test/thrift - - - - thrift-sources - generate-test-sources - - testCompile - - - - - - - diff --git a/parquet-cascading-deprecated/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading-deprecated/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java deleted file mode 100644 index b34ee7d24a..0000000000 --- a/parquet-cascading-deprecated/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.cascading; - -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.thrift.TBase; - -import cascading.flow.FlowProcess; -import cascading.tap.Tap; -import org.apache.parquet.filter2.predicate.FilterPredicate; -import org.apache.parquet.hadoop.ParquetInputFormat; -import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat; -import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat; -import org.apache.parquet.hadoop.thrift.ThriftReadSupport; -import org.apache.parquet.hadoop.thrift.TBaseWriteSupport; -import org.apache.parquet.thrift.TBaseRecordConverter; - -@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x -public class ParquetTBaseScheme> extends ParquetValueScheme { - - // In the case of reads, we can read the thrift class from the file metadata - public ParquetTBaseScheme() { - this(new Config()); - } - - public ParquetTBaseScheme(Class thriftClass) { - this(new Config().withRecordClass(thriftClass)); - } - - public ParquetTBaseScheme(FilterPredicate filterPredicate) { - this(new Config().withFilterPredicate(filterPredicate)); - } - - public ParquetTBaseScheme(FilterPredicate filterPredicate, Class thriftClass) { - this(new Config().withRecordClass(thriftClass).withFilterPredicate(filterPredicate)); - } - - public ParquetTBaseScheme(Config config) { - super(config); - } - - @Override - public void sourceConfInit(FlowProcess fp, - Tap tap, JobConf jobConf) { - super.sourceConfInit(fp, tap, jobConf); - jobConf.setInputFormat(DeprecatedParquetInputFormat.class); - ParquetInputFormat.setReadSupportClass(jobConf, ThriftReadSupport.class); - ThriftReadSupport.setRecordConverterClass(jobConf, TBaseRecordConverter.class); - } - - @Override - public void sinkConfInit(FlowProcess fp, - Tap tap, JobConf jobConf) { - - if (this.config.getKlass() == null) { - throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor"); - } - - DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf); - DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class); - TBaseWriteSupport.setThriftClass(jobConf, this.config.getKlass()); - } -} diff --git a/parquet-cascading-deprecated/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading-deprecated/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java deleted file mode 100644 index 22e67ea720..0000000000 --- a/parquet-cascading-deprecated/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - package org.apache.parquet.cascading; - -import java.io.IOException; -import java.util.List; -import java.util.Objects; - -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.RecordReader; - -import cascading.flow.FlowProcess; -import cascading.scheme.Scheme; -import cascading.scheme.SinkCall; -import cascading.scheme.SourceCall; -import cascading.tap.CompositeTap; -import cascading.tap.Tap; -import cascading.tap.TapException; -import cascading.tap.hadoop.Hfs; -import cascading.tuple.Fields; -import cascading.tuple.Tuple; -import cascading.tuple.TupleEntry; -import org.apache.parquet.filter2.predicate.FilterPredicate; -import org.apache.parquet.hadoop.Footer; -import org.apache.parquet.hadoop.ParquetInputFormat; -import org.apache.parquet.hadoop.ParquetOutputFormat; -import org.apache.parquet.hadoop.mapred.Container; -import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat; -import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat; -import org.apache.parquet.schema.MessageType; - -/** - * A Cascading Scheme that converts Parquet groups into Cascading tuples. - * If you provide it with sourceFields, it will selectively materialize only the columns for those fields. - * The names must match the names in the Parquet schema. - * If you do not provide sourceFields, or use Fields.ALL or Fields.UNKNOWN, it will create one from the - * Parquet schema. - * Currently, only primitive types are supported. TODO: allow nested fields in the Parquet schema to be - * flattened to a top-level field in the Cascading tuple. - */ - -@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x -public class ParquetTupleScheme extends Scheme{ - - private static final long serialVersionUID = 0L; - private String parquetSchema; - private final FilterPredicate filterPredicate; - - public ParquetTupleScheme() { - super(); - this.filterPredicate = null; - } - - public ParquetTupleScheme(Fields sourceFields) { - super(sourceFields); - this.filterPredicate = null; - } - - public ParquetTupleScheme(FilterPredicate filterPredicate) { - this.filterPredicate = Objects.requireNonNull(filterPredicate, "filterPredicate cannot be null"); - } - - public ParquetTupleScheme(FilterPredicate filterPredicate, Fields sourceFields) { - super(sourceFields); - this.filterPredicate = Objects.requireNonNull(filterPredicate, "filterPredicate cannot be null"); - } - - /** - * ParquetTupleScheme constructor used a sink need to be implemented - * - * @param sourceFields used for the reading step - * @param sinkFields used for the writing step - * @param schema is mandatory if you add sinkFields and needs to be the - * toString() from a MessageType. This value is going to be parsed when the - * parquet file will be created. - */ - public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String schema) { - super(sourceFields, sinkFields); - parquetSchema = schema; - this.filterPredicate = null; - } - - @SuppressWarnings("rawtypes") - @Override - public void sourceConfInit(FlowProcess fp, - Tap tap, JobConf jobConf) { - - if (filterPredicate != null) { - ParquetInputFormat.setFilterPredicate(jobConf, filterPredicate); - } - - jobConf.setInputFormat(DeprecatedParquetInputFormat.class); - ParquetInputFormat.setReadSupportClass(jobConf, TupleReadSupport.class); - TupleReadSupport.setRequestedFields(jobConf, getSourceFields()); - } - - @Override - public Fields retrieveSourceFields(FlowProcess flowProcess, Tap tap) { - MessageType schema = readSchema(flowProcess, tap); - SchemaIntersection intersection = new SchemaIntersection(schema, getSourceFields()); - - setSourceFields(intersection.getSourceFields()); - - return getSourceFields(); - } - - private MessageType readSchema(FlowProcess flowProcess, Tap tap) { - try { - Hfs hfs; - - if( tap instanceof CompositeTap ) - hfs = (Hfs) ( (CompositeTap) tap ).getChildTaps().next(); - else - hfs = (Hfs) tap; - - List