Skip to content

KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally#10537

Merged
guozhangwang merged 3 commits intoapache:trunkfrom
spena:change_time_ordered_key_schema
Apr 18, 2021
Merged

KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally#10537
guozhangwang merged 3 commits intoapache:trunkfrom
spena:change_time_ordered_key_schema

Conversation

@spena
Copy link
Copy Markdown
Contributor

@spena spena commented Apr 14, 2021

This PR changes the TimeOrderedKeySchema composite key from time-seq-key -> time-key-seq to allow deletion of duplicated time-key records using the RocksDB deleteRange API. It also removes all duplicates when put(key, null) is called. Currently, the put(key, null) was a no-op, which was causing problems because there was no way to delete any keys when duplicates are allowed.

The RocksDB deleteRange(keyFrom, keyTo) deletes a range of keys from keyFrom (inclusive) to keyTo (exclusive). To make keyTo inclusive, I incremented the end key by one when calling the RocksDBAccessor.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

}

@Override
public void remove(final Bytes key, final long timestamp) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure how to name this method. I initially called removeRange(key, from, to), but I don't want to support a time range with a specific key because time-ordered key schema will delete other keys between from-key and to-key.

So I thought of just using one timestamp, to make sure this is not called with a time range. But removeRange(key, timestamp) does not look like a range. I ended up just calling it remove. Any thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just calling remove is totally fine :)

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall! Just minor comments.

}

@Override
public void remove(final Bytes key, final long timestamp) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just calling remove is totally fine :)

* @param timestamp
* @return The key that represents the prefixed Segmented key in bytes.
*/
default Bytes toBinary(final Bytes key, long timestamp) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about "toStoreBinaryKeyPrefix"?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* A {@link RocksDBSegmentedBytesStore.KeySchema} to serialize/deserialize a RocksDB store
* key into a schema combined of (time,seq,key). This key schema is more efficient when doing
* range queries between a time interval. For key range queries better use {@link WindowKeySchema}.
* key into a schema combined of (time,key,seq).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add a note that since key is variable length while time/seq is fixed length, when formatting in this order varying time range query would be very inefficient since we'd need to be very conservative in picking the from / to boundaries; however for now we do not expect any varying time range access at all, only fixed time range only.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

throw new UnsupportedOperationException();
}

public static Bytes toStoreKeyBinary(final Bytes key,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: toStoreKeyBinaryPrefix.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@spena
Copy link
Copy Markdown
Contributor Author

spena commented Apr 16, 2021

Thanks @guozhangwang , I applied the changes.

@guozhangwang
Copy link
Copy Markdown
Contributor

Thanks @spena , I will merge after green builds.

@guozhangwang guozhangwang merged commit 15c24da into apache:trunk Apr 18, 2021
@guozhangwang
Copy link
Copy Markdown
Contributor

Merged to trunk, thanks @spena !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants