Skip to content

KAFKA-3522: Add RocksDBTimestampedStore#6149

Merged
mjsax merged 6 commits intoapache:trunkfrom
mjsax:kafka-3522-rocksdb-format-rocksdbwithtimestampstore
Jan 31, 2019
Merged

KAFKA-3522: Add RocksDBTimestampedStore#6149
mjsax merged 6 commits intoapache:trunkfrom
mjsax:kafka-3522-rocksdb-format-rocksdbwithtimestampstore

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Jan 15, 2019

Part of KIP-258. adds only internal classes (we can merge right away).

@mjsax mjsax added the streams label Jan 15, 2019
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 15, 2019

Call for review @guozhangwang @bbejeck @vvcephei

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this test is, to catch interface changes if we upgrade RocksDB. Using reflections, we make sure the RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsFacade maps all methods from DBOptions and ColumnFamilyOptions to/from Options correctly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice!

Actually, can you add that statement as a class javadoc, for posterity?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was it possible for you to test the test? I.e., how do you know the test does what it's supposed to do?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested it "manually" by removing one method in RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsFacade -- this lets the test fail. Is this sufficient for you?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\cc @vvcephei

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! Sorry, I lost your reply in the shuffle.

That's sufficient. It looked right to me, just wanted to make sure we've seen it fail at least once.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In contrast to RocksDBStore, we use column families. Thus, we cannot pass in Options object that is used by users to specify custom RocksDB options via StreamsConfig; instead we need to use DBOptions and ColumnFamilyOptions -- thus, we need to translated from Options into those two classed via this helper class.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try to open the DB with both column families -- this might fail if only one exist. For this case, we create the CF and retry to open the DB afterwards

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All operations are defined over both CF; ie, we do dual put/get operation to migrate data from default CF (that does not store any timestamps) to the new CF

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class will get more helper method later...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and other stuff in this class is only small code cleanups and reordering of method. There is no actual change.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Thanks for doing it.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hard work @mjsax. I made an initial pass and left some comments.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I personally don't like the double brace initialization for Java collections, but since this subjective feel free to ignore this comment.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return an empty Array instead?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null indicates not found and should be correct. Let me know if you disagree.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree, forgot about returning null from store to indicate not found

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why two calls here to db.compactRange? Do we need to make a call for each column family?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On oversight from my side. I think we can just call it once (will remove the duplicate line).

The docs are not very specific if this triggers compaction for all CF, but I believe yes. There is also an API to compact one CF, but I don't see any reason to use it. Thoughts?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this one more, I actually believe that this would only compact default CF -- I'll add this to the RocksDBAccessor interface and pass in CF handlers.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the operations involving the noTimestampColumnFamily maybe we could put a guard condition around each operation if(noTimesampColumnFamilyNotEmpty) {..} and the boolean flag is set when opening the db and check the size of the default column family with something like approximateNumEntries but only checking the noTimestampColumnFamily.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this with the new RocksDBAccessor interface.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this work with callers? Will we inspect each value and insert an unknown timestamp for records from default column family?

EDIT: NM I didn't see the RocksDbIterator class below when writing this comment.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. This is an actual change. Need to make this class static to be reused in new RocksDBTimestampedStore class. For this reason, we need to pass in openIterators in the constructor now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. If it's static, then we could also extract it to a separate file, which might be better than having one of the siblings define a class that both siblings use.

Also, it would reduce the LOC in this class, which is nice for readability.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@mjsax mjsax force-pushed the kafka-3522-rocksdb-format-rocksdbwithtimestampstore branch from 2102fef to 391caa9 Compare January 17, 2019 22:21
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 17, 2019

Updated this with RocksDBAccessor interface, that allows us to implement single/dual column-family access, depending if there is data in the old column family or not.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @mjsax. I've made another pass, and I like the approach of determining when to stop using both column families.

Regarding a test for RocksDBTimestampedStore I have a proposal,

  1. Open a plain RocksDB give it the two expected column families and populate the RocksDB.DEFAULT_COLUMN_FAMILY with a known number of records, then close the plain RocksDB.
  2. Open an instance of RocksDBTimestampedStore then do a series of gets on the known keys and assert all returned with the unknown timestamp, then close.
  3. Open the plain RocksDB again and assert that RocksDB.DEFAULT_COLUMN_FAMILY` is empty.

WDYT?

@mjsax mjsax changed the title KAFKA-3522: Add RocksDBWithTimestampsStore KAFKA-3522: Add RocksDBTimestampedStore Jan 18, 2019
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 18, 2019

@bbejeck Added a test as requested :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restoreAllInternal() is only called here, so I think it's better to inline it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see why we need to make name package-private now; think it is okay.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test let checkstyle fail because method was too long. Extracted some part into prepareOldStore() to fix checkstyle.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar as above: extracted into own method to avoid checkstyle issue

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the updates and test @mjsax, overall looks good, I'd like to make one more pass

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @mjsax .

Overall, it looks good. I left a few comments...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat!

Although, it seems like more of an adapter than a facade ;)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Renaming.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find the reference that makes it necessary to make this public...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Reverting.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Thanks for doing it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. If it's static, then we could also extract it to a separate file, which might be better than having one of the siblings define a class that both siblings use.

Also, it would reduce the LOC in this class, which is nice for readability.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems misplaced. IIUC, the class already restricts the key type to Bytes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c&p issue (from RocksDBStore). At some point, the class had generics and it seems while refactoring this we missed to update this JavaDoc. Fixing in both classes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it matters. This implementation is pretty straightforward; it doesn't seem like the bytebuffer really has any advantage.

If the others agree, we should get rid of the todo.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It the same question as #6151 (comment)

I would like to get a uniform agreement/policy what to use and will update the code accordingly after we made a decision. \cc @guozhangwang

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say just remove the TODO; using either is fine here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in person, I update to use ByteBuffer what should be our default API to use to modify byte arrays.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice!

Actually, can you add that statement as a class javadoc, for posterity?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was it possible for you to test the test? I.e., how do you know the test does what it's supposed to do?

@mjsax mjsax force-pushed the kafka-3522-rocksdb-format-rocksdbwithtimestampstore branch from f9a5a3b to 110cbf1 Compare January 23, 2019 00:42
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 23, 2019

Updated this.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass on non-testing code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope of this: we should consider have a KIP in the next release to refactor APIs of config setters as:

public interface RocksDBConfigSetter {

    /**
     * Set the rocks db options for the provided storeName.
     * 
     * @param storeName     the name of the store being configured
     * @param options       the Rocks DB options
     * @param configs       the configuration supplied to {@link org.apache.kafka.streams.StreamsConfig}
     */
    void setConfig(final String storeName, final DBOptions dbOptions, final ColumnFamilyOptions cfOptions, final Map<String, Object> configs);
}

And then we can choose to call the other constructor of

public Options(final DBOptions dbOptions,
      final ColumnFamilyOptions columnFamilyOptions) {..}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Can you please create a Jira for tracking? Otherwise it might get dropped.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked at https://github.com/facebook/rocksdb/blob/master/options/options.cc#L367 there seems no code for setting memtable config here.

I'd suggest we just mimic the code here instead of following the FAQ since that page may well obsoleted.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Will update the comment accordingly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meta: this and the existing class still share a lot of common code; I'm wondering if it is possible to just add a flag to the existing class, based on which we can decide whether use Options v.s. DBOptions / CFOptions, and use RocksDBAcccessor.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That works. We can share more code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just following on my other idea about collapsing into a single class here: maybe instead of naming it as keyValueWithTimestamp, we just name it as:

  1. "default" -> version 2.1-
  2. "2.2" -> version 2.2 to now.

And the flag can just be indicating if it is 1) or 2) above; in the future if we need to do this again we can then have:

  1. "default" -> version 2.1-
  2. "2.2" -> version 2.2 - 2.5 (just made that up).
  3. "3.0" -> version 3.0 - now.

etc.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot rename default column family (and we cannot delete it either) -- it's there all the time. I can still rename the new CF to 2.2 if you want. Let me know.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest try-catch each line separately since the underlying RocksDBException would not tell you which line actually went wrong, and this piece of info would be very useful for trouble shooting; ditto below.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can still use write(batch); here for efficiency?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write(batch) should be done by the caller -- that we call it in the other accessor is a bug -- will rename the method to prepareBatch().

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I can follow this: could you elaborate a bit more? I was thinking to use the RocksDB's

void write(final WriteOptions writeOpts, final WriteBatch updates)

API for the putAll call, is it possible?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is exactly what is use. Compare RocksDB#putAll():

    @Override
    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
        try (final WriteBatch batch = new WriteBatch()) {
            dbAccessor.prepareBatch(entries, batch);
            write(batch);
        } catch (final RocksDBException e) {
            throw new ProcessorStateException("Error while batch writing to store " + name, e);
        }
    }

It calls dbAccessor.prepareBatch(entries, batch); (already renamed from putAll -> prepareBatch) that is this method. We do the call "outside" because only who the batch is prepared differs.

Does this make sense

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say just remove the TODO; using either is fine here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which do you think is more accurate, this or db.getLongProperty("rocksdb.estimate-num-keys"); directly?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If CF is not specified, not all CF all accessed by default CF only -- thus, we need to add both.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extended RocksDBTimestampedStoreTest accordingly to test approximateNumEntries(), too.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably save this class if we consolidate on one RocksDBStore.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UPDATE: can we just modify RocksDbKeyValueBytesStoreSupplier to allow passing in a flag, based on which we will construct RocksDB v.s. TRocksDB?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@mjsax mjsax force-pushed the kafka-3522-rocksdb-format-rocksdbwithtimestampstore branch from 300dcc2 to 8d74b82 Compare January 25, 2019 07:08
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 25, 2019

Updated this. Reworked RocksDB and RocksDBTimestampStore to share more code. Both use RocksDBAccessor now. Also consolidate the two corresponding tests.

Rebased to get BloomFilter changes that got merged recently.

@mjsax mjsax force-pushed the kafka-3522-rocksdb-format-rocksdbwithtimestampstore branch from 8d74b82 to c854ee8 Compare January 25, 2019 07:16
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made another pass over the PR.

}

@Override
public synchronized KeyValue<Bytes, byte[]> next() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though this is not introduced in this PR: this function seems not needed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Maybe we should add the check if the iterator is open? Similar to hasNext() ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. super.next() calls hasNext() anyway. This part is covered. Removing this method.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see why we need to make name package-private now; think it is okay.

final String name;
private final String parentDir;
private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why openIterators needs to be package-private now?

public synchronized byte[] get(final Bytes key) {
validateStoreOpen();
try {
return dbAccessor.get(key.get());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For put / putAll etc the exception is captured inside the dbAccessor, while for get it is captured in the caller, is it intentional?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not, we should move the exception capturing logic inside the dbAccessor as well.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair question. If we move all try-catch into the dbAccessor, we need to duplicate more code. For example, db.flush() could throw and catching the exception in RocksDB#flush() is a single place -- if we move it into the dbAccessor, we need to to try-catch in both implementations.

Thoughts?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put is different, because you requested to catch put() and get() within DualAccessor separately -- thus, I moved the code inside.

For putAll() does also throw (main pattern is that the caller catches).

final byte[] value = get(key);
final byte[] oldValue;
try {
oldValue = dbAccessor.getOnly(key.get());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here.

}
}

private class DualColumnFamilyAccessor implements RocksDBAccessor {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we can make it just a general accessor that takes two parameters: oldCF and newCF? Or we can do this generalizing in the future if you'd like to hard-code for now.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make one, how does it now that it need to upgrade stuff or not? Seems to imply a null check for each operation if both or only one CF should be accessed? That would imply runtime overhead. Also, I think the code would be a little bit harder to read. Thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will create the DualColumnFamilyAccessor only when at the construction time if we found there are two CFs and the old CF is not empty right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that means we do not need to do extra check inside this accessor impl right?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 from me as well for putting both accessors into separate classes

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at it closer. I still think it's better to split them out, but I also don't think it's a correctness issue right now, so I'd be fine with merging what you have.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's to some refactoring as follow ups. It's internal anyway. Would like to keep the PRs focused to get stuff in and merged :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I was only suggesting to make the current Dual accessor to be more general
currently it assumes the old to be default, and new to be withTimestamp.
what I was suggesting is only to make these two parameterized; so that in the future we only have two accessor impls:

  1. XX-CF only; which we already do in this PR.
  2. XX-to-YY upgrade: old XX CF to YY CF upgrade accessor.

All that being said, I'm okay with such refactoring as follow-ups.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refactoring is already done: DualAccessor has a constructor with two CF parameters.

There is one missing piece: the conversion function is hard coded. After TimestampedByteStore PR is merged, I will refactor this to pass in a RecordConverter that and will pass in TimestampedByteStore#convertValueToNewFormat()

nextNoTimestamp = null;
iterNoTimestamp.next();
} else {
if (comparator.compare(nextNoTimestamp.key.get(), nextWithTimestamp.key.get()) <= 0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (comparator.compare(nextNoTimestamp.key.get(), nextWithTimestamp.key.get()) == 0) we need to advance on both ends while only returning the one from with-timestamp iterator, otherwise we may get duplicates returned.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never happen, because each key should be stored only once. Either in old or new CF. Or do I miss something?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is still possible. Here's one scenario:

  1. last checkpoint at offset 100; all writes goes to old CF.
  2. continue writes to old CF til offset 110, but no checkpoint written yet.
  3. non-graceful shutdown happens, and upon restarting new CF is used.
  4. we start restoring from offset 100 to log-end-offset 110, to the new CF.

Now we ended with data of offsets 100-110 in both CFs.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Do we guarantee that concurrent IQ will not see duplicated results with the db-accessor updating logic as well? If yes, we can save this check.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be safe. Note, that IQ is "disabled" if we are not in RUNNING state.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UPDATE: can we just modify RocksDbKeyValueBytesStoreSupplier to allow passing in a flag, based on which we will construct RocksDB v.s. TRocksDB?

}

private KeyValue<Bytes, byte[]> getKeyValueNoTimestamp() {
return new KeyValue<>(new Bytes(iterNoTimestamp.key()), getValueWithUnknownTimestamp(iterNoTimestamp.value()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest one optimization here, in order to save array-copying every time while we scan over the old CF: we keep the original key and value bytes for no-timestamp iterator, and only when we've already decided to assign from this iterator to next then we will call getValueWithUnknownTimestamp to do the bytes array copying.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.



// iterating should not migrate any data, but return all key over both CF (plus surrogate timestamps for old CF)
final KeyValueIterator<Bytes, byte[]> it = rocksDBStore.all();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also test for range(from, to) queries?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @mjsax !

I'm still looking over it, but I wanted to submit one batch of feedbacks for you.

-John

private ColumnFamilyHandle noTimestampColumnFamily;
private ColumnFamilyHandle withTimestampColumnFamily;

RocksDBTimestampedStore(final String name) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 It looks like the two-arg constructor is unused.

final ColumnFamilyOptions columnFamilyOptions) {
final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions),
new ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8), columnFamilyOptions));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did @guozhangwang suggest to rename this DF to 2.2? I actually think the descriptive name might be better. It seems like it'll be less work in the long run to remember what exactly is different about the different CFs.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

He did suggest but there was no final agreement. I personally don't care too much.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about naming keyValueWithTimestamp_2.2?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strong about my suggestion either :) Will leave it to anyone who has a strong feeling here.

}
}

private class DualColumnFamilyAccessor implements RocksDBAccessor {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I like the existing design. The DualColumnFamilyAccessor has to do a lot of extra checking that isn't necessary if there's just one cf to deal with. If we collapse them into one class with a flag, we pay for it with a lot of branching.

One thing I did find confusing was reasoning about the fact that the Dual accessor is embedded in this (Timestamped) class, and the Single accessor is embedded in the parent (non-timestamped) class. But, we're using it as an accessor for this (the child) class. This seems unnecessarily convoluted, and it's a little hard to see if it's actually ok, or just coincidentally ok, since the parent and child APIs are only semantically, rather than actually, different.

It seems simpler to understand if we pull both accessors out into separate classes that take db, name, options, etc as constructor arguments, rather than closing over protected state.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, Overall looks good to me, I want to take another pass over the unit test later this evening.

final ColumnFamilyOptions columnFamilyOptions) {
final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions),
new ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8), columnFamilyOptions));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about naming keyValueWithTimestamp_2.2?

}
}

private class DualColumnFamilyAccessor implements RocksDBAccessor {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 from me as well for putting both accessors into separate classes

public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
final String unknownTimestampString = new String(new LongSerializer().serialize(null, ConsumerRecord.NO_TIMESTAMP));

String getTimestampPrefix() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is unused

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor nit otherwise LGTM.

// approx: 4 entries on old CF, 1 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(5L));

// should add new key10 to new CF
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment states key10 added, but actual key put is key8

}

if (nextNoTimestamp == null) {
if (nextWithTimestamp == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should both iterators also be reporting !isValid here as well? I'm finding he rocksdb iterator api a little confusing...

I guess if we never allow a null key into the store, then this is an effective way to check for the end of the iteration.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null keys are not allowed -- that is the assumption in this check. We can add the !isValid as safe guard if you want.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly about it. If we enforce the "no null keys" invariant, then they are equivalent.

It seems mildly confusing that we essentially have two different methods of determining when the iterator has run out of data. I leave it up to you.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\cc @guozhangwang WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Updating this.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @mjsax ,

There are a few minor discussions, but overall, it looks good to me.

Thanks!
-John

@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM. @mjsax please feel free to merge after @vvcephei 's comments.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 30, 2019

Updated this. Will merge when Jenkins is green.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 30, 2019

https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/19162/

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at kafka.api.AdminClientIntegrationTest.testElectPreferredLeaders(AdminClientIntegrationTest.scala:1273)

Retest this please.

@mjsax mjsax merged commit 73565b7 into apache:trunk Jan 31, 2019
@mjsax mjsax deleted the kafka-3522-rocksdb-format-rocksdbwithtimestampstore branch January 31, 2019 00:13
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* AK/trunk:
  fix typo (apache#5150)
  MINOR: Reduce replica.fetch.backoff.ms in ReassignPartitionsClusterTest (apache#5887)
  KAFKA-7766: Fail fast PR builds (apache#6059)
  KAFKA-7798: Expose embedded clientIds (apache#6107)
  KAFKA-7641; Introduce "group.max.size" config to limit group sizes (apache#6163)
  KAFKA-7433; Introduce broker options in TopicCommand to use AdminClient (KIP-377)
  MINOR: Fix some field definitions for ListOffsetReponse (apache#6214)
  KAFKA-7873; Always seek to beginning in KafkaBasedLog (apache#6203)
  KAFKA-7719: Improve fairness in SocketServer processors (KIP-402) (apache#6022)
  MINOR: fix checkstyle suppressions for generated RPC code to work on Windows
  KAFKA-7859: Use automatic RPC generation in LeaveGroups (apache#6188)
  KAFKA-7652: Part II; Add single-point query for SessionStore and use for flushing / getter (apache#6161)
  KAFKA-3522: Add RocksDBTimestampedStore (apache#6149)
  KAFKA-3522: Replace RecordConverter with TimestampedBytesStore (apache#6204)
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants