Skip to content

Conversation

@YannByron
Copy link
Contributor

Change Logs

This PR is going to support CDC Read in Spark.

The changes are listed:

  1. extract some cdc common classes in hudi-common that will be used to implement CDCReader for different engines;
  2. CDCReader to response the CDC query on spark.

Impact

Low

@YannByron
Copy link
Contributor Author

@xushiyan @prasannarajaperumal @alexeykudinkin please continue to review this.
and i will submit a following commit that use the avro format, not json format, to persist the fields.

@YannByron YannByron force-pushed the cdc_query branch 5 times, most recently from c4f22cd to e674bd8 Compare September 23, 2022 15:03
@nsivabalan
Copy link
Contributor

cancelling all azure CI runs for now to investigate CI flakiness. will retrigger build once we are in stable state. sorry about the inconvenience.

* limitations under the License.
*/

package org.apache.spark.sql.avro
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid any changes to the borrowed classes -- we keep changes to them to absolutely necessary minimum to make sure they do not diverge from Spark impl, and we're able to cherry-pick and carry forward these changes whenever we backport new version (from Spark)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has to. The original logical (use val converter: Any => Any = {) has a bug that it will return the same value when we call this method twice continuously. And HoodieCDCRDD need these changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has to. The original logical (use val converter: Any => Any = {) has a bug that it will return the same value when we call this method twice continuously. And HoodieCDCRDD need these changes.

so looks like an improvement we can land and fix separately? better to track it separately as CDC impl. do not need to know about this fix, right? the APIs remain the same.

Another note: it's not very obvious to reviewers until you explained as above. so for the sake of faster review, please comment on it yourself and explain proactively.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure i understand what the issue is. Can you please create a separate PR with it? If there's a bug let's make sure we're adding necessary tests for it.

import scala.collection.JavaConverters._
import scala.util.Try

object LogIteratorUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's consolidate this w/ LogFileIterator (let's name this LogFileIterator, there's no need for separate utils object)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

// TODO extract to HoodieAvroSchemaUtils
abstract class AvroProjection extends (GenericRecord => GenericRecord)

class SafeAvroProjection(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please make sure we format params in-line w/ existing style formatting (i believe it'd be also captured in the style-guide):

def foo(a: Int, 
        b: String, ...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

)
}

private lazy val mapper: ObjectMapper = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this, given we moved to Avro?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we need this.

metaClient: HoodieTableMetaClient
) extends Iterator[InternalRow] with SparkAdapterSupport with AvroDeserializerSupport with Closeable {

private val fs = metaClient.getFs.getFileSystem
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These vals should be lazy by default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

/**
* * the change type, which decide to how to retrieve the change data. more details see: [[CDCFileTypeEnum]]
* */
private HoodieCDCLogicalFileType cdcFileType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make all fields final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

* Here define four cdc file types. The different cdc file type will decide which file will be
* used to extract the change data, and how to do this.
*
* CDC_LOG_FILE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure these are in-sync w/ the RFC

@xushiyan did we end up revisiting this terminology in the RFC?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll keep consistent between this pr and rfc.


import java.io.IOException;

public class HoodieCDCLogRecordReader implements ClosableIterator<IndexedRecord> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an Iterator rather than Reader

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


@Override
public boolean hasNext() {
if (itr == null || !itr.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If we flip this conditional we can decrease nested-ness

* Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all of the records stored in
* Delta Log files (represented as [[InternalRow]]s)
*/
class LogFileIterator(split: HoodieMergeOnReadFileSplit,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannByron are you making any changes to these or just extracting this code practically as is (with minor changes to abstract params)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just extract this code so that CDC can reuse them. (minor changes: extend some params to make them common and usable).

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
Copy link
Contributor

@danny0405 danny0405 Sep 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the import sequences of all the files. org.apache.hudi package should be in the front.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* at a single commit.
* <p>
* For [[cdcFileType]] = [[CDCFileTypeEnum.ADD_BASE_FILE]], [[cdcFile]] is a current version of
* the base file in the group, and [[beforeFileSlice]] is None.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[[]] is scala style doc, you can use {@code xxx} instead in java.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

* Then build a [[ChangeFileForSingleFileGroupAndCommit]] object.
*/
private HoodieCDCFileSplit parseWriteStat(
HoodieFileGroupId fileGroupId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ChangeFileForSingleFileGroupAndCommit do you mean HoodieCDCFileSplit ?

// no cdc log files can be used directly. we reuse the existing data file to retrieve the change data.
String path = writeStat.getPath();
if (path.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
// this is a base file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better use FSUtils.isBaseFile because we support ORC format as well.

HoodieCDCFileSplit changeFile =
parseWriteStat(fileGroupId, instant, writeStat, commitMetadata.getOperationType());
if (!fgToCommitChanges.containsKey(fileGroupId)) {
fgToCommitChanges.put(fileGroupId, new ArrayList<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use computeIfAbsent should be fine.


Schema mergedSchema = Schema.createRecord("CDC", null, tableSchema.getNamespace(), false);
Schema mergedSchema = Schema.createRecord("CDC", null, "", false);
mergedSchema.setFields(fields);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will restore it.

if (reader.hasNext()) {
HoodieDataBlock dataBlock = (HoodieDataBlock) reader.next();
if (dataBlock.getBlockType() == HoodieLogBlock.HoodieLogBlockType.CDC_DATA_BLOCK) {
itr = dataBlock.getRecordIterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there other data blocks here ? If not, we can remove this check for efficiency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it can be simplified.

* Here we use the debezium format.
*/
val FULL_CDC_SPARK_SCHEMA: StructType = {
StructType(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to expose the schema as debezium for reader internal ? Why not reuse the field _hoodie_operaton which is a hoodie format.

Curious how downstream pipeline handle these records ? For SQL users, they declare the table schema, for example with fields (a, b, c, d), now you return RDD with schema of avro, how and when to deserialize them into (a, b, c ,d) then ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the returned RDD use the json string, not the avro format (that only used inside of hudi).
op, ts_ms, before and after are the fields given in debezium.

@yihua yihua added priority:blocker Production down; release blocker big-needle-movers reader-core labels Sep 24, 2022
List<FileStatus> touchedFiles = new ArrayList<>();
for (String touchedPartition : touchedPartitions) {
Path partitionPath = FSUtils.getPartitionPath(basePath, touchedPartition);
touchedFiles.addAll(Arrays.asList(fs.listStatus(partitionPath)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can improve to only add the files that belongs to the touched fileGroups.

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, nice work, let's address the left comments in subsequent PRs.

// TODO extract to HoodieAvroSchemaUtils
abstract class AvroProjection extends (GenericRecord => GenericRecord)

class SafeAvroProjection(sourceSchema: Schema,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you moved code around, please annotate it by commenting on PR yourself and explain it's moved from where and what was modified. otherwise it'll be hard for reviewers to make a call to approve or not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. From HoodieMergeOnReadRDD without any change.

* limitations under the License.
*/

package org.apache.spark.sql.avro
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has to. The original logical (use val converter: Any => Any = {) has a bug that it will return the same value when we call this method twice continuously. And HoodieCDCRDD need these changes.

so looks like an improvement we can land and fix separately? better to track it separately as CDC impl. do not need to know about this fix, right? the APIs remain the same.

Another note: it's not very obvious to reviewers until you explained as above. so for the sake of faster review, please comment on it yourself and explain proactively.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Member

@xushiyan xushiyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 let's make sure the in-code TODO can be followed up

Comment on lines +221 to 224
} else if (isCdcQuery) {
CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
} else {
(tableType, queryType, isBootstrappedTable) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this if-check could merge with the match below, to achieve code alignment

Comment on lines +71 to +73
private val isCDCQuery = CDCRelation.isCDCEnabled(metaClient) &&
parameters.get(DataSourceReadOptions.QUERY_TYPE.key).contains(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) &&
parameters.get(DataSourceReadOptions.INCREMENTAL_FORMAT.key).contains(DataSourceReadOptions.INCREMENTAL_FORMAT_CDC_VAL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repeated check-logic. could have been extracted to a util

@xushiyan xushiyan merged commit df69aa7 into apache:master Sep 26, 2022
* file is new-coming, so we can load this, mark all the records with `i`, and treat them as
* the value of `after`. The value of `before` for each record is null.
*
* BASE_FILE_INSERT:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

* a whole file group. First we find this file group. Then load this, mark all the records with
* `d`, and treat them as the value of `before`. The value of `after` for each record is null.
*/
public enum HoodieCDCInferCase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Infer" is a verb, we should rather call this HoodieCDCInferenceCase

originTableSchema: HoodieTableSchema,
cdcSchema: StructType,
requiredCdcSchema: StructType,
changes: Array[HoodieCDCFileGroupSplit])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannByron let's make sure we annotate this as @transient (these shouldn't be serialized and passed down to executor, similar to other RDDs)

changes: Array[HoodieCDCFileGroupSplit])
extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD {

@transient private val hadoopConf = spark.sparkContext.hadoopConfiguration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's inline this to avoid mistakes


private lazy val fs = metaClient.getFs.getFileSystem

private lazy val conf = new Configuration(confBroadcast.value.value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid copying (and do it only if we modify it) -- it's not cheap


/**
* Two cases will use this to iterator the records:
* 1) extract the change data from the base file directly, including 'ADD_BASE_File' and 'REMOVE_BASE_File'.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannByron need to update this to align w/ HoodieCDCInferenceCase

private var currentInstant: HoodieInstant = _

// The change file that is currently being processed
private var currentChangeFile: HoodieCDCFileSplit = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: currentCDCFileSplit

FileSlice beforeFileSlice = new FileSlice(fileGroupId, writeStat.getPrevCommit(), beforeBaseFile, new ArrayList<>());
cdcFileSplit = new HoodieCDCFileSplit(BASE_FILE_DELETE, null, Option.empty(), Option.of(beforeFileSlice));
} else if (writeStat.getNumUpdateWrites() == 0L && writeStat.getNumDeletes() == 0
&& writeStat.getNumWrites() == writeStat.getNumInserts()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannByron there's an issue right now where we undercount inserts (AFAIR) and so numWrites != numUpdates + numInserts we need to be careful with this conditionals (and address the underlying issue as well)

* Hoodie CDC Relation extends Spark's [[BaseRelation]], provide the schema of cdc
* and the [[buildScan]] to return the change-data in a specified range.
*/
class CDCRelation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure this is rebased onto HoodieBaseRelation

(cdcLogRecordIterator == null || !cdcLogRecordIterator.hasNext)
}

@tailrec final def hasNextInternal: Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should split CDCFileGroupIterator into N iterators for every HoodieCDCInferenceCase to make it more manageable and easier to understand

// - Projected schema
// As such, no particular schema could be assumed, and therefore we rely on the caller
// to correspondingly set the scheme of the expected output of base-file reader
private val baseFileReaderAvroSchema = sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable = false, "record")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @YannByron

Recently, we found an Avro schema issue which is caused by the wrong record name (detail here: #7284).

May I ask if this line could cause the same problem? If so, we can discuss how to fix it in PR: #7297

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is just moved from an other class. So I am not sure whether can work correctly in any cases.
But i have solved the very similar problem that caused by avro namespace: 60b62fc .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks @YannByron

Reviewed the code recently, turns out this Avro schema baseFileReaderAvroSchema is only used for resolveNullableType in AvroSerializer. It won't be involved in any serializer/deserializer process, so it's okay to use "record" as Avro schema name.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

big-needle-movers priority:blocker Production down; release blocker

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

8 participants