Skip to content

Conversation

@pvary
Copy link
Contributor

@pvary pvary commented Sep 1, 2020

The goal of the patch is to have a PoC implementation of Hive writes to Iceberg tables.

The patch changes:

  • HiveIcebergOutputFormat / RecordWriter to write out data to Iceberg tables
  • HiveIcebergSerDe.serialize so the data in the Writables are converted to the correct Iceberg values
  • Tests for the happy path of the things above

The tests were run successfully. Also after adding the change after the uber jar patch I was able to create a Hive table above an unpartitioned Iceberg table and write and read back data from it. The table was created with the following command:

CREATE EXTERNAL TABLE purchases STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION '/tmp/hive/iceberg/test-run-3885506338417686444/purchases'
TBLPROPERTIES ('iceberg.mr.write.file.format'='orc') ;

Findings:

  • When writing UUID type fields Parquet writer requires byte[], on the other hand ORC and Avro requires UUID object. I think this should be generic throughout the FileFormats.
  • The Hive interface does not provide the possibility to "commit" all the writes in the query at once. Will do more investigation but I have not found a way to do it it one run. The current implementation immediately commits the changes upon closing the writer. This is suboptimal and not correct when we write data which ends up on multiple FileSinks.

Missing stuff:

  • Hive commit as described above
  • Way to handle partitioned tables. Do I have to write multiple DataFiles for it, and add them manually to the commit, or there is some API helping me out? What about multicolumn partitioning - is it possible?
  • More tests
  • Error handling
  • Logging
  • Did I mention more tests? :)

Any suggestions / ideas / thoughts are welcome.

Thanks,
Peter

@probot-autolabeler probot-autolabeler bot added the MR label Sep 1, 2020
@pvary
Copy link
Contributor Author

pvary commented Sep 1, 2020

@massdosage, @rdblue: You might be interested in reviewing this.
Thanks,
Peter

@rdblue
Copy link
Contributor

rdblue commented Sep 1, 2020

Thanks for working on this, @pvary! I think the first thing is to revert some of the renaming. That will make the patch smaller and easier to review. We'll also need to find a place to perform the commit, as you said. Maybe @omalley can help advise.

@pvary
Copy link
Contributor Author

pvary commented Sep 3, 2020

Here comes the new implementation with the mapred.OutputCommitter.
If the previous one was The Bad, then this one is The Ugly. 😃

What I have learned:

  • mapred.OutputCommitter has to be a static class - and it has to have a default constructor
2020-09-03T13:03:10,637  INFO [Thread-100] mapred.LocalJobRunner: Failed to createOutputCommitter
java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat$IcebergOutputCommitter.<init>()
	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:135) ~[hadoop-common-3.1.0.jar:?]
	at org.apache.hadoop.mapred.LocalJobRunner$Job.createOutputCommitter(LocalJobRunner.java:515) ~[hadoop-mapreduce-client-common-3.1.0.jar:?]
	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:530) ~[hadoop-mapreduce-client-common-3.1.0.jar:?]
Caused by: java.lang.NoSuchMethodException: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat$IcebergOutputCommitter.<init>()
	at java.lang.Class.getConstructor0(Class.java:3082) ~[?:1.8.0_152]
	at java.lang.Class.getDeclaredConstructor(Class.java:2178) ~[?:1.8.0_152]
	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:129) ~[hadoop-common-3.1.0.jar:?]
  • There is a method helping to configure jobs which are very handy if specific configuration is needed. See: HiveIcebergStorageHandler.configureJobConf. We can use this to set the OutputCommitter - otherwise Hive uses a default NullOutputCommitter.
  • There is a different instance of HiveIcebergOutputFormat on HiveServer2, and on the Mappers/Reducers. Reflection is used to create it on the Mapper/Reducer side from scratch, so the only way to send info to it is JobConf. See: HiveIcebergStorageHandler.configureOutputJobProperties
  • getHiveRecordWriter will be called on Mapper/Reducer side, so we do not want to load table there 😄
  • We might be able to communicate between RecordWriter and OutputCommitter on Mapper/Reducer side through JobConf/TaskAttemptContext.jobConf, but I am not too comfortable with it - Any thoughts here?

The basic idea behind the patch is:

  1. We need to use HiveIcebergStorageHandler.configureJobConf to set the mapred.output.committer.class.
  2. Writers are creating the files for a directory defined by the table and the hive query id. When a writer is closed it closes the appender and creates a committed file for marking the file finished.
  3. When the job is finished it lists the directory and appends the data files to the table.

What I do not like:

  • Relies on listing of the target dir. In Hive we do this often, and on S3 we have to use S3Guard to have a reliable listing. I was hoping to get rid of this at least for Iceberg tables. @rdblue, @massdosage: Do you think it would be an acceptable compromise for Hive writes? Do you have any better ideas?
  • Creating the commit file would be better placed in commitTask, but serializing the Metrics into the JobConf object seems like a bad idea. @rdblue, @massdosage: Any thoughts?

What I have found:

  • org.apache.iceberg.Metrics implements Serializable but uses ByteBuffers which are not Serializable, so I had to create a serialized version for it.

Thanks,
Peter

@rdblue
Copy link
Contributor

rdblue commented Sep 3, 2020

Relies on listing of the target dir.

Can we find out in job commit how many writer tasks there were? Then we could use well-known locations and make sure each one is read.

@pvary
Copy link
Contributor Author

pvary commented Sep 4, 2020

Relies on listing of the target dir.

Can we find out in job commit how many writer tasks there were? Then we could use well-known locations and make sure each one is read.

I suspect that the JobContext contains only input information about the number of mappers/reducers. I have only debugged the LocalJobRunner code for now, but I did not see anything which would indicate that we have up-to-date information there.
The only solution for it I was able to come up is creating a new JobClient to get the info from the server. I was not able to make it work for the LocalJobRunner yet, and I think this would be too specific for MR.
How does this work for Spark writes? Do we have any other places where MR write is already implemented for Iceberg?

Updated the PR to commit the task only at IcebergOutputCommitter.commitTask, and not at IcebergRecordWriter.close.

@pvary
Copy link
Contributor Author

pvary commented Sep 8, 2020

As @steveloughran is our resident OutputCommitter expert, I feel better about our currently proposed solution now.
Based on his suggestions parallelized the FileSystem calls, also fleshed out the tests.

@rdblue, @massdosage: What are your thoughts of the current proposed solution? Code styling and formatting questions are also fair game, since I still just learn the style working on Iceberg. Could you please take a look at #1430 too, as it would help us to get rid of the SerializableMetrics class.

Thanks,
Peter

@rdblue
Copy link
Contributor

rdblue commented Sep 8, 2020

Good to see your name pop up here, @steveloughran!

}

// Appending data files to the table
AppendFiles append = table.newAppend();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know that the write is always an append? Does Hive pass the operation so we can determine whether to use an overwrite or replacePartitions operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integrating with Hive update/delete statements would be a harder nut to crack.
Currently we are aiming only for inserts.
Does this still mean we have to think about replacePartitions, or overwrite? Or my assumption was correct that we need only to use newAppend in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current patch is only aiming to insert new data to the table (no delete/update etc at the moment).
@rdblue: Is it enough to add data with newAppend, or we have to think about overwrite/replacePartitions too?
Thanks,
Peter

public boolean needsTaskCommit(TaskAttemptContext context) {
// We need to commit if this is the last phase of a MapReduce process
return TaskType.REDUCE.equals(context.getTaskAttemptID().getTaskID().getTaskType()) ||
context.getJobConf().getNumReduceTasks() == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this just be true? I think that every task using the output committer needs to commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can define OutputCommitter on job level only, and AFAIK Hive writes only on the last stage of the job be it a Reducer for a MapReduce job, or a Map for a MapOnly job.

This way we can save IO by not generating and reading those unnecessary files, but depending on assumptions of the way how Hive works.

Your thoughts? Still voting for the more general solution?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that I don't think Hive calls the output committer unless it is writing to the table. The map phase of a write job should call the committer only if it is a map-only job. And the reduce phase will call the committer if the job has a reduce phase. Because the MR framework takes care of calling commitTask for the right phase, we can default this to true and it should be identical.

You can probably test this assumption by changing the logic here to throw an exception if it is violated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some HiveRunner tests that do writes that involves a MR job might help too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested this with adding the jars to the Hive master from the apache repo and running the queries in local mode. I got some exception and started to investigate the source.

The actual query I have used for this had an order by so it had both a Mapper and a Reducer phase:

insert into purchases select * from purchases order by id;

My debugging revealed that the commitTask was called for Mappers and Reducers as well. The JobRunner infrastructure calls those and it does not have any information about if the task has actually written anything to anywhere or not. I think this is the purpose of the needsTaskCommit() method, so the developer of the OutputCommitter could decide.

@massdosage: Do you have an easy example for HiveRunner write tests?

Thanks,
Peter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for testing it out. If Hive does call this even in map stages, then I agree that we should keep the logic that you have here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary I guess it would look very similar to one of the existing HiveRunner tests here but one could do a hiveShell.execute() with an insert, update, etc statement and then do a query to select the results back?

I'm not sure whether these examples (https://github.com/klarna/HiveRunner/blob/master/src/test/java/com/klarna/hiverunner/InsertIntoTableIntegrationTest.java and https://github.com/klarna/HiveRunner/blob/master/src/test/java/com/klarna/hiverunner/data/InsertIntoTableTest.java) from HiveRunner itself would trigger the OutputFormat for the insert operations? Presumably so if the table under test created is created using the StorageHandler?

I'd also highly recommend testing this in distributed mode, we found lots of issues with the InputFormat that only manifested themselves then. I can help with this if you want as we have a test environment for this setup already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@massdosage: Created HiveRunner tests. Could you please take a look at them? (TestHiveRunnerWrite.java)

Trying out on a real cluster is on my TODO list, and will definitely do, but if you have a test cluster and some ideas it would be good if you take a stab at it too (in Hungary we say, that "More eyes see more"). I tried with HiveOnLLAP, but the OutputCommitter lifecycle is not yet handled correctly there, so we have to try the MR way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@massdosage, @rdblue: Tested out the stuff on an MR Cluster by running the following queries successfully:

insert into test_table values(3,"3"),(4,"4");
insert into test_table select * from test_table;
insert into test_table select * from test_table order by id;
insert into test_table select * from test_table_2;

Had Map only jobs and Map-Reduce jobs too. The result was checked by Hive select queries and everything seems ok.

So if Adrian's team did not find any issues I would think that this PR is ready to be pushed.
Your thoughts?

Thanks, Peter

@pvary pvary changed the title Hive: HiveIcebergOutputFormat first implementation - WIP Hive: HiveIcebergOutputFormat first implementation for handling Hive inserts - WIP Sep 11, 2020
@pvary pvary changed the title Hive: HiveIcebergOutputFormat first implementation for handling Hive inserts - WIP Hive: HiveIcebergOutputFormat first implementation for handling Hive inserts into unpartitioned Iceberg tables - WIP Sep 11, 2020
@pvary
Copy link
Contributor Author

pvary commented Sep 11, 2020

The current status of the PR:

Open questions:

  • Waiting for API: Fix Metrics serialization #1430 API: Fix Metrics serialization to be approved, so I can get rid of the extra SerializableMetrics class
  • Should every task write a .committed file, or just the task in the last phase of the job - Decision needed what is the main goal, performance or generality. Current solution is more performance friendly
  • Do we need to handle other Iceberg constructs than newAppend() for adding data to the table? If I understand everything correctly this should be enough to handle inserts started from Hive

Postponed:

  • Finding a better place for the configuration

Non-goals:

  • Updates/Deletes
  • Insert overwrites
  • Partitioned Iceberg table handling
  • Complex structures (Map / List / Struct)

@pvary pvary changed the title Hive: HiveIcebergOutputFormat first implementation for handling Hive inserts into unpartitioned Iceberg tables - WIP Hive: HiveIcebergOutputFormat first implementation for handling Hive inserts into unpartitioned Iceberg tables Sep 16, 2020
@pvary
Copy link
Contributor Author

pvary commented Sep 17, 2020

Can we push this change @massdosage, @rdblue?

I very much would like to see it in the next release so our testing would be much easier.
Also I have a patch at hand which would enable Hive users to create previously non-existent tables over Iceberg which is not that big and depending on this. We might even be able to get into the release if we push this soon. With these 2 patches we could write whole end-to-end Hive tests (CREATE/WRITE/READ/DROP) with the next release.

All that said, we can work on our branch so I do not want to push too hard 😸

@pvary pvary force-pushed the insert branch 2 times, most recently from 8ff1668 to ee6f1b6 Compare September 24, 2020 15:45
@rdblue
Copy link
Contributor

rdblue commented Oct 7, 2020

@pvary, after #1495, are we ready to start getting this write support in? I can start reviewing it next.

@pvary pvary force-pushed the insert branch 2 times, most recently from 774ef50 to 3c858b3 Compare October 20, 2020 17:37
@pvary
Copy link
Contributor Author

pvary commented Oct 20, 2020

@rdblue: If you have time, could you please review this one?
Rebased and also added Partitioned writes

@pvary
Copy link
Contributor Author

pvary commented Oct 20, 2020

@marton-bod: You might be interested in this too, as I have changed some Hive3 related stuff too

@rdblue
Copy link
Contributor

rdblue commented Oct 28, 2020

@pvary, how about we target this for the 0.11.0 release? Right now, we're trying to get 0.10.0 finished and that's taking attention. After that, I should have time to start reviewing this. Does that work for you?

@pvary
Copy link
Contributor Author

pvary commented Nov 2, 2020

@pvary, how about we target this for the 0.11.0 release? Right now, we're trying to get 0.10.0 finished and that's taking attention. After that, I should have time to start reviewing this. Does that work for you?

Ok with that, but I would definitely love to see this in 0.11.0 😃

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been dealing with changes related to job uniqueness (HADOOP-17318, SPARK-33402), so I worry about making sure you are getting unique values and paths everywhere.

Regarding the committer, why a "v1 api" not the later org.apache.hadoop.mapreduce package. I'd recommend implementing the committer under those classes ..but then I'm not up to date with how hive will use the committer...

What I will suggest: every commit operation to log its invocation and duration, along with job id. this is what you need when trying to debug issues

* An Iceberg table committer for adding data files to the Iceberg tables.
* Currently independent of the Hive ACID transactions.
*/
public final class HiveIcebergOutputCommitter extends OutputCommitter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this and not the (newer) org.apache.hadoop.mapreduce.OutputCommitter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First I was trying to implement the v2 (mapreduce) API, and was surprised why my outputcommitter methods were not called. I had to realize that Hive still uses the old v1 API. When I downgraded to v1 then the methods were called.


@Override
public void commitTask(TaskAttemptContext context) throws IOException {
TaskAttemptID attemptID = context.getTaskAttemptID();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Been having fun in spark and hadoop about making sure this is unique. What runtime are you targeting here, and where does it get job/task IDs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The jobId is generated by Hadoop and I do not think in Hive we add any magic there:

JobClient jc = new JobClient(job);
rj = jc.submitJob(job);
this.jobID = rj.getJobID();

Is the problem mentioned in the jiras above (HADOOP-17318, SPARK-33402) are caused by the spark specific jobId generation, or would this appear when we are using standard Hadoop JobIDs?

// If the data is not empty add to the table
if (!closedFiles.isEmpty()) {
closedFiles.forEach(file -> {
DataFiles.Builder builder = DataFiles.builder(table.spec())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #1818, we're refactoring a little so that the writer produces a DataFile for you. Hopefully that will make this a bit smaller.

private DeserializerHelper() {
}

static Record deserialize(Object data, Schema tableSchema, ObjectInspector objectInspector) throws SerDeException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use the approach we've taken with the other formats and build a tree based on the schema that Hive is writing to do the inspection?

For Spark, we generate this in advance. If the user writes a tuple of (string, short) to a table, we generate a struct reader that has two field readers, one to read a short and write an int, and one to read a UTF8String and write it as a CharSequence. That avoids inspecting the schema structure to find out what to do each time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue: Could you please check #1854? Implemented the Deserializer as suggested (at least I hope I understood correctly what you meant 😄)

import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

class DeserializerHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a good class to separate into its own PR with tests.

* An Iceberg table committer for adding data files to the Iceberg tables.
* Currently independent of the Hive ACID transactions.
*/
public final class HiveIcebergOutputCommitter extends OutputCommitter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we separate this class into its own PR with unit tests? I think that would help break this down into manageable chunks.

@rdblue
Copy link
Contributor

rdblue commented Jan 22, 2021

This was committed in separate PRs, so I'll close it. Thanks @pvary!

@rdblue rdblue closed this Jan 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants