Skip to content

[WIP] KAFKA-7157: Connect TimestampConverter SMT doesn't handle null values#6446

Closed
Nimfadora wants to merge 2 commits intoapache:trunkfrom
Nimfadora:KAFKA-7157
Closed

[WIP] KAFKA-7157: Connect TimestampConverter SMT doesn't handle null values#6446
Nimfadora wants to merge 2 commits intoapache:trunkfrom
Nimfadora:KAFKA-7157

Conversation

@Nimfadora
Copy link
Copy Markdown
Contributor

Goal

Introduce null-value handling to TimestampConverter SMT.

Details

The existing org.apache.kafka.connect.transforms.TimestampConverter does not handle null values. When null value is passed to SMT the NPE is thrown. This PR introduces null vallue handling for this SMT.

schemaless null value will result in null record value
schemaless null complex object will result in null record value
null struct(has schema) will result in null record value with optional struct schema
null struct(has schema) field will result in record value with null field value and optional struct schema for that field

Important

We consider that original schema with null value will have optional modifier. Maybe we should be smarter and decide on the value of optional modifier based on the field actual nullability.

Testing

Unit tests are provided

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@Nimfadora
Copy link
Copy Markdown
Contributor Author

@rhauch could you please review this pr?

@Nimfadora
Copy link
Copy Markdown
Contributor Author

@ewencp as this SMT is authored by you, could you please take a look at these changes?

@Nimfadora
Copy link
Copy Markdown
Contributor Author

@rhauch @ewencp could you please take a look at this straight-forward PR?

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @Nimfadora. I like your general approach to this bug, but I think it would be easier to merge if the changes were smaller with fewer modified lines. If we were just apply to trunk, this might be okay. But we actually want to backport this, and minimizing the changes might also help make it more clear that the behavior is indeed only changing when the timestamps are null. WDYT?

final Map<String, Object> value = requireMap(rawValue, PURPOSE);
final HashMap<String, Object> updatedValue = new HashMap<>(value);
updatedValue.put(config.field, convertTimestamp(value.get(config.field)));
return newRecord(record, null, updatedValue);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that the if block always returns, and so an else block is unnecessary. But because we likely want to backport this, and because we'd like to minimize the changes to help ensure the behavior remains the same for non-null timestamps, it's probably worth it to not remove the else block and keep the original indentation.

}
// Value is Struct, only its single field should be converted
if (rawValue == null) {
Schema updatedSchema = updateSchema(originalSchema);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not using a cached schema, which means that a new schema has to be built for every record that uses some original schema, A, but that also has a null timestamp field. Wouldn't it make sense to cache this updated schema? And, since the updates schema is not a function of the record, we should be able to look for the updated schema in the cache or update the schema and cache it before dealing with the raw value.

This might also mean that there are fewer lines changed, especially if we keep the else block and previous indentation (as mentioned before, even though strictly speaking the else is unnecessary), which might help us ensure that the behavior doesn't change for the non-null timestamps.


Struct updatedValue = applyValueWithSchema(value, updatedSchema);
return newRecord(record, updatedSchema, updatedValue);
private Schema updateSchema(Schema originalSchema) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, if we're only going to call this from one location (see previous comment), maybe it's better to not pull this logic out to minimize changes.

@kkonstantine
Copy link
Copy Markdown
Contributor

This fix was actually implemented by: #7070 which referenced this initial implementation here. KAFKA-7157 has been resolved.

Closing this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants