Skip to content

Conversation

@lhotari
Copy link
Member

@lhotari lhotari commented Aug 27, 2021

Motivation

I'm investigating an issue where setting batch-builder for a Debezium Postgres source doesn't seem to have an impact even after applying fixes made by #11706 for supporting --batch-builder KEY_BASED for sources.
When reading the code, I found a special condition for ignoring the source record key.

if (record.getKey().isPresent() && !(record.getSchema() instanceof KeyValueSchema &&
((KeyValueSchema) record.getSchema()).getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED)) {
msg.key(record.getKey().get());
}

Modifications

Remove the extra condition. Always set the output message key when the source record has a key.

@lhotari lhotari added this to the 2.9.0 milestone Aug 27, 2021
@lhotari lhotari self-assigned this Aug 27, 2021
@lhotari
Copy link
Member Author

lhotari commented Aug 27, 2021

This special logic was added by #6034 . @tuteng do we need this special condition? I'm proposing to remove it with this PR. Are you fine with that?

@lhotari
Copy link
Member Author

lhotari commented Aug 27, 2021

If consumed storage space of the key in the output message is the concern, the key should be calculated as a hash from the key.
I guess the key size might become an issue in certain cases?

@lhotari
Copy link
Member Author

lhotari commented Aug 27, 2021

Also found this kind of logic:

if (schema instanceof KeyValueSchemaImpl && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
"This method is not allowed to set keys when in encoding type is SEPARATED");
if (key == null) {
msgMetadata.setNullPartitionKey(true);
return this;
}
}

@lhotari
Copy link
Member Author

lhotari commented Aug 27, 2021

Oh now I understand how this works:

if (value instanceof org.apache.pulsar.common.schema.KeyValue
&& schema.getSchemaInfo() != null && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value;
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
// set key as the message key
if (kv.getKey() != null) {
msgMetadata.setPartitionKey(
Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
msgMetadata.setPartitionKeyB64Encoded(true);
} else {
this.msgMetadata.setNullPartitionKey(true);
}
// set value as the payload
if (kv.getValue() != null) {
this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
} else {
this.msgMetadata.setNullValue(true);
}
return this;
}
}

closing this PR.

@lhotari lhotari closed this Aug 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant