-
Notifications
You must be signed in to change notification settings - Fork 108
WIP: State indexer #488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: State indexer #488
Conversation
… to be consumed by another job
| @@ -0,0 +1,188 @@ | |||
| { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this file or was this just for testing?
| // Create a new view object. | ||
| utxoView, err := NewUtxoView(desoBlockProducer.chain.db, desoBlockProducer.params, | ||
| desoBlockProducer.postgres, desoBlockProducer.chain.snapshot) | ||
| desoBlockProducer.postgres, desoBlockProducer.chain.snapshot, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a rule-of-thumb for when we need to include the event manager or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we always include it except for weird edge-cases. In this case, getBlockTemplate runs outside of the main event loop, and just generates blocks for miners. I would expect mempool may be the one other place where we don't include the EventManager.
lib/block_view.go
Outdated
| TxHash: txHash, | ||
| IsConnected: true, | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have some emit logic at the transaction level? If I just do a basic transfer, do we get all the transaction fields passed through somewhere? Wondering if we can get all the outputs in the new state syncer db, so we can see "oh you sent money to x, y, and z"
lib/block_view_creator_coin.go
Outdated
| }) | ||
|
|
||
| if bav.EventManager != nil && emitMempoolTxn { | ||
| bav.EventManager.mempoolTransactionConnected(&MempoolTransactionEvent{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to emit state changes for balance changes when the creator gets a founder reward?
lib/block_view_flush.go
Outdated
| return bav.FlushToDbWithTxn(txn, blockHeight) | ||
| }) | ||
| if bav.EventManager != nil { | ||
| bav.EventManager.dbFlushed(&DBFlushedEvent{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to look into what this does
lib/state_change_syncer.go
Outdated
| // Get the cached mempool transaction from the connected mempool map. | ||
| if connectedMempoolTx, ok := stateChangeSyncer.ConnectedMempoolTxMap[*event.TxHash]; ok { | ||
| // Check to see if the index in question has a "core_state" annotation in its definition. | ||
| if !isCoreStateKey(connectedMempoolTx.KeyBytes) { | ||
| return | ||
| } | ||
|
|
||
| stateChangeEntry.Encoder = connectedMempoolTx.Encoder | ||
| stateChangeEntry.KeyBytes = connectedMempoolTx.KeyBytes | ||
| stateChangeEntry.UtxoOps = connectedMempoolTx.UtxoOps | ||
| } else { | ||
| return | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Get the cached mempool transaction from the connected mempool map. | |
| if connectedMempoolTx, ok := stateChangeSyncer.ConnectedMempoolTxMap[*event.TxHash]; ok { | |
| // Check to see if the index in question has a "core_state" annotation in its definition. | |
| if !isCoreStateKey(connectedMempoolTx.KeyBytes) { | |
| return | |
| } | |
| stateChangeEntry.Encoder = connectedMempoolTx.Encoder | |
| stateChangeEntry.KeyBytes = connectedMempoolTx.KeyBytes | |
| stateChangeEntry.UtxoOps = connectedMempoolTx.UtxoOps | |
| } else { | |
| return | |
| } | |
| // Get the cached mempool transaction from the connected mempool map. | |
| connectedMempoolTx, ok := stateChangeSyncer.ConnectedMempoolTxMap[*event.TxHash] | |
| if !ok { | |
| return | |
| } | |
| // Check to see if the index in question has a "core_state" annotation in its definition. | |
| if !isCoreStateKey(connectedMempoolTx.KeyBytes) { | |
| return | |
| } | |
| stateChangeEntry.Encoder = connectedMempoolTx.Encoder | |
| stateChangeEntry.KeyBytes = connectedMempoolTx.KeyBytes | |
| stateChangeEntry.UtxoOps = connectedMempoolTx.UtxoOps |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
early return and avoiding else statements where possible.
lib/state_change_syncer.go
Outdated
| if isEncoder, encoder := StateKeyToDeSoEncoder(stateChangeEntry.KeyBytes); isEncoder && encoder != nil { | ||
| encoderType = encoder.GetEncoderType() | ||
| } else { | ||
| // If the keyBytes is not an encoder, then we decode the entry from the key value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a comment that this is expected for things such as LikeEntry where the entry is decoded from the key. the comment is a little off at the end - the phrase "key value" is a little confusing.
lib/state_change_syncer.go
Outdated
|
|
||
| // Encode the state change entry. We encode as a byte array, so the consumer can buffer just the bytes needed | ||
| // to decode this entry when reading from file. | ||
| entryBytes := EncodeToBytes(0, stateChangeEntry, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm we probably need a real block height in this call to EncodeToBytes, right? This gets passed down into RawEncodeWithoutMetadata
lib/state_change_syncer.go
Outdated
| writeBytes := EncodeByteArray(entryBytes) | ||
|
|
||
| decodedStateChangeEntry := &StateChangeEntry{} | ||
| DecodeFromBytes(decodedStateChangeEntry, bytes.NewReader(entryBytes)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to catch the error from DecodeFromBytes?
lib/state_change_syncer.go
Outdated
|
|
||
| // Update the state change file size. | ||
| transactionLen := uint32(len(writeBytes)) | ||
| unflushedBytes.StateChangeBytesSize += transactionLen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add overflow check? We have these SafeUint utils lying around
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 (assuming we keep this field-- see my other comment)
| } | ||
|
|
||
| // Get the relevant deso encoder for this keyBytes. | ||
| var encoderType EncoderType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More comments here. I want to better understand when something hits the first case vs the second. I think it happens whenever we have a field in the db that's a "key only" field with no Entry as a value.
lib/state_change_syncer.go
Outdated
| err = mempoolUtxoView.FlushToDbWithTxn(txn, uint64(server.blockchain.bestChain[len(server.blockchain.bestChain)-1].Height)) | ||
|
|
||
| if err != nil || originalCommittedFlushId != stateChangeSyncer.BlockSyncFlushId { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second part of this is tricky. I think it's checking whether a block came in while we were mempooling. Regardless, add comment.
| AncestralRecord DeSoEncoder | ||
| // The ancestral record represented in bytes. | ||
| AncestralRecordBytes []byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment on when ancestral record bytes is used. I assume in hypersync it's not pupulated for example. Also I think it's only for upsert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, it's only for upsert. We only use ancestral records when syncing mempool entries. When the new block syncs, we need to revert any mempool entries we've stored in the database before applying the committed entries. We do this reversion by reverting to the ancestral record value.
| StateChangeBytes []byte | ||
| StateChangeBytesSize uint64 | ||
| // This is a list of the indexes of the state change bytes that should be written to the state change index file. | ||
| StateChangeOperationIndexes []uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Give comments on these fields that explains how they work.
|
|
||
| // During blocksync, we flush all entries by index to the state change file once the sync is complete. | ||
| // This is used to track whether this procedure has been initiated. | ||
| BlocksyncCompleteEntriesFlushed bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain the deeper reasons for why we do this optimization, rather than just flushing with each block. We want to give a lot of context for optimizations like these so that we can revisit them intelligently in the future. Otherwise, people are scared to change things like this for fear it will break things in unexpected ways.
cmd/node.go
Outdated
| // Setup chain database | ||
| dbDir := lib.GetBadgerDbPath(node.Config.DataDirectory) | ||
| opts := lib.PerformanceBadgerOptions(dbDir) | ||
| opts := badger.DefaultOptions(dbDir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert to PerformanceBadgerOptions per our conversation
| // Initialize the ancestral records database | ||
| snapshotDirectory := filepath.Join(GetBadgerDbPath(mainDbDirectory), "snapshot") | ||
| snapshotOpts := PerformanceBadgerOptions(snapshotDirectory) | ||
| snapshotOpts := DefaultBadgerOptions(snapshotDirectory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert to PerformanceBadgerOptions per our conversation
scripts/nodes/n0_test
Outdated
| --add-ips=localhost:19000 \ | ||
| --testnet=true \ | ||
| --api-port=18001 \ | ||
| --protocol-port=18000 \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we delete the whole nodes folder here? People should only use the ones in backend from now on. These are outdated and misleading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense to me 👍
lib/block_producer.go
Outdated
| _, _, _, _, err = utxoViewCopy._connectTransaction( | ||
| mempoolTx.Tx, mempoolTx.Hash, int64(mempoolTx.TxSizeBytes), uint32(blockRet.Header.Height), true, | ||
| false /*ignoreUtxos*/) | ||
| _, _, _, _, err = utxoViewCopy._connectTransaction(mempoolTx.Tx, mempoolTx.Hash, int64(mempoolTx.TxSizeBytes), uint32(blockRet.Header.Height), true, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if you could do a pass to fix all the line lengths that the refactoring broke. With vim you can highlight a line and hit "gq" and it does it. But not sure what the shortcut is in other things.
| if innerErr = bc.blockView.FlushToDbWithTxn(txn, blockHeight); innerErr != nil { | ||
| return errors.Wrapf(innerErr, "ProcessBlock: Problem writing utxo view to db on simple add to tip") | ||
| } | ||
| if bc.eventManager != nil && !bc.eventManager.isMempoolManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do me a favor and just add a comment explaining why we put this event trigger here rather than elsewhere. For example, is there a reason why we need to put it in the badger Update() vs outside of it? Just explain it, or explain that the placement doesn't matter, in case someone needs to move things around in the future.
lib/state_change_syncer.go
Outdated
| EncoderBytes: dbEntry.Value, | ||
| } | ||
|
|
||
| fmt.Printf("Flushing entry %d to file\n", ii) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to leave all these prints in here? Fine if they're intentional I'm just flagging it.
No description provided.