Skip to content

feat: add builder observer data indexer#802

Merged
kant777 merged 18 commits intomainfrom
feat/builder-observer-indexer
Oct 8, 2025
Merged

feat: add builder observer data indexer#802
kant777 merged 18 commits intomainfrom
feat/builder-observer-indexer

Conversation

@rose2221
Copy link
Copy Markdown
Contributor

@rose2221 rose2221 commented Sep 24, 2025

feat(indexer): builder-observer data ingestor

Adds comprehensive blockchain indexing with MEV relay bid data and BeaconChain metadata for builder observer functionality.

Local Setup (StarRocks in Docker)

# 1) Start StarRocks all-in-one
docker run -d --name starrocks-local \
  -p 9030:9030 -p 8030:8030 -p 8040:8040 \
  starrocks/allin1-ubuntu:latest

# 2) Open MySQL shell into FE
docker exec -it starrocks-local mysql -h 127.0.0.1 -P 9030 -u root

Schema DDL (Updated for Builder Observer)

CREATE DATABASE IF NOT EXISTS indexer_db;
USE indexer_db;

-- Relays configuration
CREATE TABLE IF NOT EXISTS relays (
  relay_id BIGINT NOT NULL,
  name      VARCHAR(100),
  tag       VARCHAR(100),
  base_url  VARCHAR(500),
  is_active TINYINT
)
ENGINE=OLAP
PRIMARY KEY(relay_id)
DISTRIBUTED BY HASH(relay_id) BUCKETS 1
PROPERTIES (
  "replication_num" = "1",
  "enable_persistent_index" = "true"
);


-- Blocks with complete builder observer data
CREATE TABLE IF NOT EXISTS blocks (
  -- Core block data
  slot BIGINT,
  block_number BIGINT,
  timestamp DATETIME,
  proposer_index BIGINT,
  
  -- MEV/Relay data
  winning_relay VARCHAR(100),
  winning_builder_pubkey VARCHAR(200),
  fee_recipient VARCHAR(200),
  producer_reward_eth DECIMAL(20,8),
  
  -- Validator data
  validator_pubkey VARCHAR(200),
  validator_opted_in TINYINT,
  -- Tracking
  created_at DATETIME DEFAULT CURRENT_TIMESTAMP
) ENGINE=OLAP
PRIMARY KEY(slot)
DISTRIBUTED BY HASH(slot) BUCKETS 10
PROPERTIES("replication_num" = "1");

-- MEV relay bids
CREATE TABLE IF NOT EXISTS bids (
  slot BIGINT,
  relay_id BIGINT,
  builder_pubkey VARCHAR(200),
  proposer_pubkey VARCHAR(200),
  proposer_fee_recipient VARCHAR(200),
  value_wei VARCHAR(100),
  block_number BIGINT,
  timestamp_ms BIGINT
) ENGINE=OLAP
PRIMARY KEY(slot, relay_id, builder_pubkey)
DISTRIBUTED BY HASH(slot) BUCKETS 10
PROPERTIES("replication_num" = "1");

-- Application state and configuration storage
CREATE TABLE ingestor_state (
  id TINYINT NOT NULL,
  last_block_number BIGINT
)
ENGINE=OLAP
PRIMARY KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
  "replication_num" = "1",
  "enable_persistent_index" = "true"
);


## Environment Variables

Required:
```bash
DATABASE_URL=root@tcp(127.0.0.1:9030)/indexer_db
INFURA_RPC=https://mainnet.infura.io/v3/YOUR_KEY
BEACON_BASE=https://beaconcha.in/api/v1  # For BeaconChain.in API

Optional:

ETHERSCAN_API_KEY=your_key_here
HTTP_TIMEOUT=15s
BLOCK_TICK=12s
VALIDATOR_WAIT=1.5s
BACKFILL_EVERY=5m0s

Export and Run

export DATABASE_URL=root@tcp(127.0.0.1:9030)/indexer_db
export INFURA_RPC=https://mainnet.infura.io/v3/YOUR_KEY
export BEACON_BASE=https://beaconcha.in/api/v1

go run indexer/cmd/indexer/main.go

Comment thread tools/indexer/pkg/ethereum/cleint.go Outdated
Comment thread tools/indexer/pkg/database/starrock.go
Comment thread tools/indexer/pkg/database/starrock.go Outdated
ctx2, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

_, err := db.Conn.ExecContext(ctx2, `DELETE FROM ingestor_state WHERE id = 1`)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both these operations should be done atomically in a transaction.

Comment thread tools/indexer/cmd/indexer/main.go Outdated
lastBN, found := db.LoadLastBlockNumber(ctx)
if !found || lastBN == 0 {
log.Printf(" [INIT] No previous state found, checking database for latest block...")
err := db.Conn.QueryRowContext(ctx, `SELECT COALESCE(MAX(block_number),0) FROM blocks`).Scan(&lastBN)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the point of the DB is to mask the underlying storage layer, we shouldnt be accessing the Conn directly. Also if the DB layer is changed, for eg a new table instead of blocks is added, this will also have to be changed. The Conn should be unexported. Any functionality should be provided with the help of functions.

Comment thread tools/indexer/cmd/indexer/main.go Outdated

// Validate required configuration

log.Printf("[INIT] Starting blockchain indexer with StarRocks database")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In all other parts of the code we use slog. This outputs structured json logs which are easier to parse. It has option for text logging also and it can be configured. The other services also use this and we have some util pkg also. Better to start using this in the codebase.

All this manual [COMPONENT] can be added using log tags. This gives us the ability to filter using the tag in the logs instead of doing a text search.

Comment thread tools/indexer/cmd/indexer/main.go Outdated
// Fetch execution block data
ei, err := beacon.FetchCombinedBlockData(httpc, infuraRPC, beaconBase, nextBN)
if err != nil || ei == nil {
log.Printf("⏳ [BLOCK] Block %d not available yet: %v", nextBN, err)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I dont mind all these characters, not sure if anyone will be looking at this if things are running correctly. In which case you will only be looking at them when you have errors. Easier to parse the logs with lesser characters.

Comment thread tools/indexer/cmd/indexer/main.go Outdated

case <-backfillTicker.C:
log.Printf("[BACKFILL] Starting backfill operations...")
backfill.RunAll(ctx, db, httpc, createOptionsFromCLI(c), relays)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is ticker needed for backfill? This operation should only happen once. Also it can run in parallel with the main go routine so it should just be started before we enter this loop.

Comment thread tools/indexer/pkg/ethereum/cleint.go Outdated
httputil "github.com/primev/mev-commit/indexer/pkg/http"
)

const optInABIJSON = `[
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave this comment in the last PR as well. We can use the generated client in contracts-abi. All it needs is ethereum client and it will handle all the data types.

Comment thread tools/indexer/pkg/http/client.go Outdated
},
}
}
func FetchJSONWithRetry(ctx context.Context, httpc *http.Client, url string, out any, attempts int, baseDelay time.Duration) error {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to provide these type of utility functions we cannot just assume this is doing the right thing. Unit tests should be added for this pkg. If not we can use some library that provides this functionality.

Comment thread tools/indexer/pkg/database/starrock.go Outdated

}
func (db *DB) Close() {
db.Conn.Close()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesnt return error?

Also if its the job of the DB to close this, we should not be exposing the conn directly outside.

@kant777
Copy link
Copy Markdown
Contributor

kant777 commented Oct 1, 2025

@rose2221 This PR seems to have lot of unrelated changes. Is it because of rebase? if so, please fix it

@rose2221 rose2221 force-pushed the feat/builder-observer-indexer branch from 99823e3 to 1b503b2 Compare October 3, 2025 18:04
@rose2221 rose2221 force-pushed the feat/builder-observer-indexer branch from 7115450 to 98e511e Compare October 6, 2025 11:41
Comment thread tools/indexer/cmd/start.go Outdated
infuraRPC := c.String(optionInfuraRPC.Name)
beaconBase := c.String(optionBeaconBase.Name)
// Initialize random seed
rand.Seed(time.Now().UnixNano())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Comment thread tools/indexer/cmd/start.go Outdated
"validator_delay", c.Duration("validator-delay"))

// Setup graceful shutdown
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we spoke about this. This is already being done in main.go. This is not needed.

Comment thread tools/indexer/cmd/start.go Outdated

latestBlock, err := ethereum.GetLatestBlockNumber(httpc.HTTPClient, infuraRPC)
if err != nil {
initLogger.Error("failed to get latest block from RPC", "error", err)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there is an error here? lastBN will be initialized with -10

Comment thread tools/indexer/cmd/start.go Outdated
"lookback", c.Int("backfill-lookback"),
"batch", c.Int("backfill-batch"))
go backfill.RunAll(ctx, db, httpc, createOptionsFromCLI(c), relays)
initLogger.Info("[BACKFILL] completed startup backfill")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we spoke this log message is wrong. This does not mean the backfill was completed.

Comment thread tools/indexer/cmd/start.go
Comment thread tools/indexer/cmd/start.go Outdated
// Log block details
initLogger.Info("[BLOCK] processing block", "block", nextBN, "slot", ei.Slot)

if ei.Timestamp != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these log messages are spammy. We should have just 1 log message which prints all the fields.

Comment thread tools/indexer/pkg/database/starrock.go Outdated
TsMS *int64
}

func MustConnect(ctx context.Context, dsn string, maxConns, minConns int) (*DB, error) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually the MustXXX terminology is used when function panics if there is error. If you are returning error this can just be called Connect or NewDB.

Comment thread tools/indexer/cmd/start.go Outdated
initLogger.Warn("[BLOCK] not available yet", "block", nextBN, "error", err)
continue
}
fields := []any{
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why all this is required.

The ideal solution to the comment should just be one log message.

log.Info("processing block", "key1", value1, "key2", value2)

Comment thread tools/indexer/cmd/start.go Outdated
}

// final flush
if len(batch) > 0 {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If main context is cancelled, this insert will be called with cancelled context.

Comment thread tools/indexer/cmd/start.go Outdated
return db, nil
}

func closeDatabase(db *database.DB, logger *slog.Logger) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just feels like its blindly broken up into functions for the sake of refactor. For eg. this function is not needed.

Comment thread tools/indexer/cmd/start.go Outdated
}
}

func handleShutdown(ctx context.Context, db *database.DB, lastBN int64, logger *slog.Logger) error {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function and saveBlockProgress are doing the exact same thing.

Comment thread tools/indexer/cmd/start.go Outdated
initLogger.Info("indexer configuration", "lookback", c.Int("backfill-lookback"), "batch", c.Int("backfill-batch"))

// Run backfill if configured
runBackfillIfConfigured(ctx, c, db, httpc, relays, initLogger)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is happening in a single routine. The main loop will not start till the backfill is finished. This is not what we were planning.

func initializeDatabase(ctx context.Context, dbURL string, logger *slog.Logger) (*database.DB, error) {
db, err := database.Connect(ctx, dbURL, 20, 5)
if err != nil {
logger.Error("[DB] connection failed", "error", err)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really sure why the log messages need [SOME KEY]. It can just say DB connection failed. What is achieved with this square brackets? Both cases you will have to search for DB.


logger.Info("[RELAY] loaded active relays", "count", len(relays))
for _, r := range relays {
logger.Info("[RELAY] relay found", "id", r.ID, "url", r.URL)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Printing 'Relay' twice in the log message makes no sense.

Comment thread tools/indexer/cmd/start.go Outdated
logger.Info("[DB] block saved successfully", "block", nextBN)

processBidsForBlock(ctx, db, httpc, relays, ei, logger)
launchAsyncValidatorTasks(ctx, c, db, httpc, ei, beaconBase, logger)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We launch async tasks and save block progress? What if there was an error in the async task?

Comment thread tools/indexer/cmd/start.go Outdated

func launchAsyncValidatorTasks(ctx context.Context, c *cli.Context, db *database.DB, httpc *retryablehttp.Client, ei *beacon.ExecInfo, beaconBase string, logger *slog.Logger) { // Async validator pubkey fetch
if ei.ProposerIdx != nil {
go func(slot int64, proposerIdx int64) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the first go routine is supposed to populate the Validator public key which the second routine is using? What is the point of doing it in separate go routines then? The second one will fail till first one completes. Not sure if I understand this logic.


}

func saveBlockProgress(db *database.DB, blockNum int64, logger *slog.Logger) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So out of the two functions, you removed 1. Now this function is still here. As I mentioned before, just changing code for the sake of it is not the right way. You need to understand first if you even need the function. If you create smaller functions for everything, the readability is hampered. There is a balance.

Comment thread tools/indexer/pkg/backfill/backfill.go
Comment thread tools/indexer/pkg/backfill/backfill.go Outdated
var slotsForBids []int64
var validatorsToCheck []SlotData

for data := range slotChan {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better way is to have the slotChan be a channel of some type

type result struct {
  data SlotData
  err Error
}

This way you need only 1 for loop and you can check

if data.err != nil {
  // handle the error
}

Comment thread tools/indexer/pkg/backfill/backfill.go Outdated
logger.Error("opt-in status update failed", "slot", v.Slot, "error", uerr)
}
}
if err := recentBids(ctx, db, httpc, relays, slotsForBids); err != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So again, I am not sure you understood what I meant when I asked you to use channels. The idea was that we do it one by one, so get missing slot, then get the validator info and opted in status for that slot and then get the bids for that slot, then move onto the next slot.

You have just changed this validator check to be sequential but then the bids again use batches. This does not make any sense. Either you do one by one or do batch like you were doing before.

}
}

for _, r := range relays {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This for loop should be inside the above for loop no?

Comment thread tools/indexer/cmd/start.go Outdated

}

func launchAsyncValidatorTasks(ctx context.Context, c *cli.Context, db *database.DB, httpc *retryablehttp.Client, ei *beacon.ExecInfo, beaconBase string, logger *slog.Logger) error { // Async validator pubkey fetch
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer async. Naming should be changed.

}
return v.Elem().Interface()
}
func processNextBlock(ctx context.Context, c *cli.Context, db *database.DB, httpc *retryablehttp.Client, relays []relay.Row, infuraRPC, beaconBase string, lastBN int64, logger *slog.Logger) int64 {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So these 3 functions are duplicates.

processNextBlock
processBidsForBlock
launchValidatorTask

The backfill does these exact 3 things. Only difference is that in the forward operation, the block number is obtained by querying the current height and in cast of backfill it is obtained by querying DB for missing slots.

So ideally you could have 1 file with these 3 functions:

ProcessBlockData(BlockNumber)
ProcessRelayBidData(BlockNumber)
ProcessValidatorOptInData(BlockNumber, validators)

Both backfill and forward operations can just use these functions. Only the logic to get block number will be different.

@kant777 kant777 merged commit e67c6d7 into main Oct 8, 2025
7 of 9 checks passed
@kant777 kant777 deleted the feat/builder-observer-indexer branch October 8, 2025 20:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants