Skip to content

add parquet support to native batch#8883

Merged
gianm merged 12 commits intoapache:masterfrom
clintropolis:parquet-native-batch
Nov 22, 2019
Merged

add parquet support to native batch#8883
gianm merged 12 commits intoapache:masterfrom
clintropolis:parquet-native-batch

Conversation

@clintropolis
Copy link
Copy Markdown
Member

@clintropolis clintropolis commented Nov 16, 2019

Description

As a follow-up to #8823, this PR adds Parquet support to Druid native batch indexing, largely re-using existing code from the current Hadoop extension. All of the unit tests have been adapted to also run with the new DruidParquetReader.

Parquet can be used in native batch indexing with any InputSource, for example:

   "ioConfig": {
      "type": "index_parallel",
      "inputSource": {
        "type": "local",
        "baseDir": "/some/path/to/wikipedia/file/",
        "filter": "wiki.parquet"
      },
      "inputFormat": {
        "type": "parquet"
        "flattenSpec": {
            "useFieldDiscovery": true
        },
        "binaryAsString": false
      },
      "appendToExisting": false
    },

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added or updated version, license, or notice information in licenses.yaml
  • added unit tests or modified existing tests to cover new code paths.
  • been tested in a test Druid cluster.

Key changed/added classes in this PR
  • DruidParquetReader
  • DruidNativeParquetInputFormat

@vogievetsky
Copy link
Copy Markdown
Contributor

Are you planning to add the .md docs as part of this PR? I am trying to figure out what binaryAsString is

@clintropolis
Copy link
Copy Markdown
Member Author

Are you planning to add the .md docs as part of this PR? I am trying to figure out what binaryAsString is

I marked this as WIP because I haven't yet updated the docs, or added the toJson to support the sampler.

That said, binaryAsString is just an option I was preserving from the Hadoop extension, see http://druid.apache.org/docs/latest/development/extensions-core/parquet.html. ORC and Avro actually have this property too, but it isn't documented for them. In all cases it just converts byte[] columns as a UTF-8 string instead of leaving as byte[] and ending up as base64 serialized binary.

@clintropolis clintropolis removed the WIP label Nov 18, 2019
{
final Configuration conf = new Configuration();

// Set explicit CL. Otherwise it'll try to use thread context CL, which may not have all of our dependencies.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from the HDFS module to initialize Hadoopy things... Not sure of a good way to share this because it requires Hadoop libraries that core Druid doesn't have...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your github comment may be useful to have in the source code as well

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree it would be good to mention this code is copied from HdfsStorageDruidModule, and why. (And a similar comment in that file.)

That way, if someone modifies it in the future to fix a problem, they'll hopefully remember to fix both places.

Copy link
Copy Markdown
Contributor

@ccaominh ccaominh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

<artifactId>hadoop-common</artifactId>
<version>${hadoop.compile.version}</version>
<scope>compile</scope>
<!-- heh -->
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps change the comment to mention why the exclusions are needed?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, please do.

{
final Configuration conf = new Configuration();

// Set explicit CL. Otherwise it'll try to use thread context CL, which may not have all of our dependencies.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your github comment may be useful to have in the source code as well

{
private final InputRowSchema inputRowSchema;
private final ObjectFlattener<Group> flattener;
private final byte[] buffer = new byte[InputEntity.DEFAULT_FETCH_BUFFER_SIZE];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be converted to a local variable

private final ParquetGroupJsonProvider jsonProvider;

private final ParquetReader<Group> reader;
private final ParquetMetadata metadata;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused? It's updated but never read.

private final ParquetMetadata metadata;
private final Closer closer;

public DruidParquetReader(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be package-private

return converter.finalizeConversion(actualList);
}
// unknown, just pass it through
return o;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case may not be covered by unit tests

import java.util.ArrayList;
import java.util.List;

public class BaseParquetReaderTest
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be package-private

);
List<InputRowListPlusJson> sampled = sampleAllRows(reader);
List<InputRowListPlusJson> sampledAsBinary = sampleAllRows(readerNotAsString);
final String expectedJson = "{\n"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why InputEntityReader.DEFAULT_JSON_WRITER uses a pretty print writer instead of writing minified JSON?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I talked with @vogievetsky offline and DEFAULT_JSON_WRITER will be removed in my follow up PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IntermediateRowParsingReader will have toMap() instead of toJson().

<artifactId>hadoop-common</artifactId>
<version>${hadoop.compile.version}</version>
<scope>compile</scope>
<!-- heh -->
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, please do.

import java.util.Objects;

/**
* heh, DruidParquetInputFormat already exists, so I need another name
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please nix the 'heh', & consider using first-person plural rather than singular.

/**
* heh, DruidParquetInputFormat already exists, so I need another name
*/
public class DruidNativeParquetInputFormat extends NestedInputFormat
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just ParquetInputFormat? There's one in Hadoop, but who cares. It's not the boss of us.

import java.util.Map;
import java.util.NoSuchElementException;

public class DruidParquetReader extends IntermediateRowParsingReader<Group>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just ParquetReader should be fine. Everything in here is Druid.

{
final Configuration conf = new Configuration();

// Set explicit CL. Otherwise it'll try to use thread context CL, which may not have all of our dependencies.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree it would be good to mention this code is copied from HdfsStorageDruidModule, and why. (And a similar comment in that file.)

That way, if someone modifies it in the future to fix a problem, they'll hopefully remember to fix both places.

}
}

binder.requestInjection(TypeLiteral.get(Configuration.class), conf);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this for? HdfsStorageDruidModule doesn't do it. (Please include a comment.)

return DEFAULT_JSON_WRITER.writeValueAsString(converted);
}

private Object convertObject(Object o)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

normalizeObjectForJson?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is a pretty cool method that seems like it'd be useful for Avro / ORC too. Could you consider putting it in core so all the extensions can use it?

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Nov 19, 2019

This'll need a merge from master to fix LGTM.

Comment on lines +141 to +146
for native batch indexing with Parquet files, we require a small number of classes provided by hadoop-common and
hadoop-mapreduce-client-core. However, both of these jars have a very large set of dependencies, the majority of
which we do not need (and are provided by Hadoop in that environment). hadoop-common is the biggest offender,
with things like zookeeper, jetty, just .. so much stuff. These exclusions remove ~60 jars from being unnecessarily
bundled with this extension. There might be some alternative arrangement to get what we need, worth looking into if
anyone is feeling adventurous.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is very helpful. When I look at the various POMs, I often wonder why there are lots of exclusions. Thanks for adding!

private final org.apache.parquet.hadoop.ParquetReader<Group> reader;
private final Closer closer;

public ParquetReader(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be package-private

import java.util.concurrent.TimeUnit;

class ParquetGroupConverter
public class ParquetGroupConverter
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be package-private

Comment on lines +63 to +65
while (iterator.hasNext()) {
rows.add(iterator.next());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to do iterator.forEachRemaining(rows::add) instead. Similar for the method below.

Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 after latest changes, thanks @clintropolis

@gianm gianm merged commit 7250010 into apache:master Nov 22, 2019
@clintropolis clintropolis deleted the parquet-native-batch branch November 22, 2019 23:31
jon-wei pushed a commit to jon-wei/druid that referenced this pull request Nov 26, 2019
* add parquet support to native batch

* cleanup

* implement toJson for sampler support

* better binaryAsString test

* docs

* i hate spellcheck

* refactor toMap conversion so can be shared through flattenerMaker, default impls should be good enough for orc+avro, fixup for merge with latest

* add comment, fix some stuff

* adjustments

* fix accident

* tweaks
@jon-wei jon-wei added this to the 0.17.0 milestone Dec 17, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants