Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ SOURCE/JAVA-CORE
This product contains SystemInfo methods adapted from oshi
* processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java

This product contains test cases adapted from Test Framework for Apache Drill (https://github.com/apache/drill-test-framework).
* sql/src/test/resources/drill/window


MIT License
================================
Expand Down
2 changes: 1 addition & 1 deletion extensions-core/parquet-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@
</dependency>
</dependencies>
<properties>
<parquet.version>1.13.0</parquet.version>
<parquet.version>1.13.0</parquet.version>
</properties>
</profile>
</profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.parquet;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SequenceWriter;
import org.apache.druid.data.input.parquet.simple.ParquetGroupConverter;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.GroupReadSupport;

import java.io.File;

/**
* Converts parquet files into new-deliminated JSON object files. Takes a single argument (an input directory)
* and processes all files that end with a ".parquet" extension. Writes out a new file in the same directory named
* by appending ".json" to the old file name. Will overwrite any output file that already exists.
*/
public class ParquetToJson
{

public static void main(String[] args) throws Exception
{
if (args.length != 1) {
throw new IAE("Usage: directory");
}

ParquetGroupConverter converter = new ParquetGroupConverter(true);
ObjectMapper mapper = new DefaultObjectMapper();

File[] inputFiles = new File(args[0]).listFiles(
pathname -> pathname.getName().endsWith(".parquet")
);
for (File inputFile : inputFiles) {
File outputFile = new File(inputFile.getAbsolutePath() + ".json");

try (
final org.apache.parquet.hadoop.ParquetReader<Group> reader = org.apache.parquet.hadoop.ParquetReader
.builder(new GroupReadSupport(), new Path(inputFile.toURI()))
.build();
final SequenceWriter writer = mapper.writer().withRootValueSeparator("\n").writeValues(outputFile)
) {
Group group;
while ((group = reader.read()) != null) {
writer.write(converter.convertGroup(group));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

class ParquetGroupConverter
public class ParquetGroupConverter
{
private static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588;
private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
Expand Down Expand Up @@ -359,7 +360,6 @@ private static Object convertPrimitiveField(Group g, int fieldIndex, int index,
return g.getInteger(fieldIndex, index);
case INT_64:
return g.getLong(fieldIndex, index);
// todo: idk wtd about unsigned
case UINT_8:
case UINT_16:
return g.getInteger(fieldIndex, index);
Expand Down Expand Up @@ -493,11 +493,29 @@ private static BigDecimal convertBinaryToDecimal(Binary value, int precision, in

private final boolean binaryAsString;

ParquetGroupConverter(boolean binaryAsString)
public ParquetGroupConverter(boolean binaryAsString)
{
this.binaryAsString = binaryAsString;
}

/**
* Recursively converts a group into native Java Map
*
* @param g the group
* @return the native Java object
*/
public Object convertGroup(Group g)
{
Map<String, Object> retVal = new LinkedHashMap<>();

for (Type field : g.getType().getFields()) {
final String fieldName = field.getName();
retVal.put(fieldName, convertField(g, fieldName));
}

return retVal;
}

/**
* Convert a parquet group field as though it were a map. Logical types of 'list' and 'map' will be transformed
* into java lists and maps respectively ({@link ParquetGroupConverter#convertLogicalList} and
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.parquet;

import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.List;

@SuppressWarnings("ALL")
public class ParquetToJsonTest
{
@ClassRule
public static TemporaryFolder tmp = new TemporaryFolder();

@Test
public void testSanity() throws Exception
{
final File tmpDir = tmp.newFolder();
try (InputStream in = new BufferedInputStream(ClassLoader.getSystemResourceAsStream("smlTbl.parquet"))) {
Files.copy(in, tmpDir.toPath().resolve("smlTbl.parquet"));
}

ParquetToJson.main(new String[]{tmpDir.toString()});

DefaultObjectMapper mapper = DefaultObjectMapper.INSTANCE;
List<Object> objs = mapper.readerFor(Object.class).readValues(new File(tmpDir, "smlTbl.parquet.json")).readAll();

Assert.assertEquals(56, objs.size());
Assert.assertEquals(
ImmutableMap
.builder()
.put("col_int", 8122)
.put("col_bgint", 817200)
.put("col_char_2", "IN")
.put("col_vchar_52", "AXXXXXXXXXXXXXXXXXXXXXXXXXCXXXXXXXXXXXXXXXXXXXXXXXXB")
.put("col_tmstmp", 1409617682418L)
.put("col_dt", 422717616000000L)
.put("col_booln", false)
.put("col_dbl", 12900.48)
.put("col_tm", 33109170)
.build(),
objs.get(0)
);
}

@Test
public void testInputValidation()
{
Assert.assertThrows(IAE.class, () -> ParquetToJson.main(new String[]{}));
Assert.assertThrows(IAE.class, () -> ParquetToJson.main(new String[]{"a", "b"}));
}
}
Binary file not shown.
11 changes: 11 additions & 0 deletions licenses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ source_paths:

---

name: Test Framework for Apache Drill
version:
url: https://github.com/apache/drill-test-framework
license_category: source
module: java-core
license_name: Apache License version 2.0
source_paths:
- sql/src/test/resources/drill/window

---

name: AWS SDK for Java
license_category: binary
module: java-core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,28 @@ public static DruidException fromFailure(Failure failure)
return failure.makeException(new DruidExceptionBuilder(failure.getErrorCode()));
}

/**
* Build a "defensive" exception, this is an exception that should never actually be triggered, but we are
* throwing it inside of a defensive check.
*
* @return A builder for a defensive exception.
*/
public static DruidExceptionBuilder defensive()
{
return forPersona(Persona.DEVELOPER).ofCategory(Category.DEFENSIVE);
}

/**
* Build a "defensive" exception, this is an exception that should never actually be triggered, but we are
* throwing it inside of a defensive check.
*
* @return A builder for a defensive exception.
*/
public static DruidException defensive(String format, Object... args)
{
return defensive().build(format, args);
}

private final Persona targetPersona;
private final Category category;
private final String errorCode;
Expand Down Expand Up @@ -340,7 +362,7 @@ public enum Category
UNSUPPORTED(501),
/**
* A catch-all for any time when we cannot come up with a meaningful categorization. This is hopefully only
* used when converting generic exceptions from frameworks and libraries that we do not control into DruidExcpetions
* used when converting generic exceptions from frameworks and libraries that we do not control into DruidExceptions
*/
UNCATEGORIZED(500);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn;
import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
Expand All @@ -36,7 +39,9 @@
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.ComplexMetrics;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Comparator;

public class ComplexFrameColumnReader implements FrameColumnReader
{
Expand All @@ -47,8 +52,28 @@ public class ComplexFrameColumnReader implements FrameColumnReader
this.columnNumber = columnNumber;
}

@Override
public Column readRACColumn(Frame frame)
Comment thread
imply-cheddar marked this conversation as resolved.
{
return new ColumnAccessorBasedColumn(makeComplexFrameColumn(frame));
}

@Override
public ColumnPlus readColumn(final Frame frame)
{
final ComplexFrameColumn frameCol = makeComplexFrameColumn(frame);

return new ColumnPlus(
frameCol,
new ColumnCapabilitiesImpl()
.setType(frameCol.getType())
.setHasMultipleValues(false),
frame.numRows()
);
}

@Nonnull
private ComplexFrameColumn makeComplexFrameColumn(Frame frame)
{
final Memory memory = frame.region(columnNumber);
validate(memory, frame.numRows());
Expand All @@ -68,17 +93,12 @@ public ColumnPlus readColumn(final Frame frame)
final long startOfOffsetSection = Byte.BYTES + Integer.BYTES + typeNameLength;
final long startOfDataSection = startOfOffsetSection + (long) frame.numRows() * Integer.BYTES;

return new ColumnPlus(
new ComplexFrameColumn(
frame,
serde,
memory,
startOfOffsetSection,
startOfDataSection
),
new ColumnCapabilitiesImpl().setType(ColumnType.ofComplex(typeName))
.setHasMultipleValues(false),
frame.numRows()
return new ComplexFrameColumn(
frame,
serde,
memory,
startOfOffsetSection,
startOfDataSection
);
}

Expand All @@ -100,7 +120,7 @@ private void validate(final Memory region, final int numRows)
}
}

private static class ComplexFrameColumn implements ComplexColumn
private static class ComplexFrameColumn extends ObjectColumnAccessorBase implements ComplexColumn
{
private final Frame frame;
private final ComplexMetricSerde serde;
Expand Down Expand Up @@ -184,6 +204,30 @@ public void close()
// Do nothing.
}

@Override
public ColumnType getType()
{
return ColumnType.ofComplex(serde.getTypeName());
}

@Override
public int numRows()
{
return getLength();
}

@Override
protected Object getVal(int rowNum)
{
return getRowValue(rowNum);
}

@Override
protected Comparator<Object> getComparator()
{
return serde.getTypeStrategy();
}

@Nullable
private Object getObjectForPhysicalRow(final int physicalRow)
{
Expand Down
Loading