Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 8 additions & 29 deletions cl/singlenode/payloadstore/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"database/sql"
"fmt"
"log/slog"
"time"

"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/lib/pq"
Expand Down Expand Up @@ -37,9 +36,7 @@ func NewPostgresRepository(ctx context.Context, dsn string, logger *slog.Logger)
return nil, fmt.Errorf("failed to create postgres connection pool: %w", err)
}

pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := pool.Ping(pingCtx); err != nil {
if err := pool.Ping(ctx); err != nil {
pool.Close()
l.Error("Failed to close database connection after error", "error", err)
return nil, fmt.Errorf("failed to ping postgres: %w", err)
Expand All @@ -62,9 +59,7 @@ func NewPostgresRepository(ctx context.Context, dsn string, logger *slog.Logger)
CREATE INDEX IF NOT EXISTS idx_block_height ON execution_payloads(block_height);
CREATE INDEX IF NOT EXISTS idx_inserted_at ON execution_payloads(inserted_at);
`
execCtx, execCancel := context.WithTimeout(ctx, 10*time.Second)
defer execCancel()
if _, err := pool.Exec(execCtx, schemaCreationQuery); err != nil {
if _, err := pool.Exec(ctx, schemaCreationQuery); err != nil {
pool.Close()
l.Error("Failed to close database connection after error", "error", err)
return nil, fmt.Errorf("failed to create execution_payloads table: %w", err)
Expand All @@ -81,9 +76,7 @@ func NewPostgresFollower(ctx context.Context, dsn string, logger *slog.Logger) (
return nil, fmt.Errorf("failed to open postgres connection: %w", err)
}

pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := pool.Ping(pingCtx); err != nil {
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("failed to ping postgres: %w", err)
}
Expand All @@ -103,10 +96,7 @@ func (r *PostgresRepository) SavePayload(ctx context.Context, info *types.Payloa
inserted_at = NOW();
`

insertCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

result, err := r.pool.Exec(insertCtx, query, info.PayloadID, info.ExecutionPayload, info.BlockHeight)
result, err := r.pool.Exec(ctx, query, info.PayloadID, info.ExecutionPayload, info.BlockHeight)
if err != nil {
r.logger.Error(
"Failed to insert payload into postgres",
Expand Down Expand Up @@ -146,10 +136,7 @@ func (r *PostgresRepository) GetPayloadsSince(ctx context.Context, sinceHeight u
LIMIT $2;
`

queryCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

rows, err := r.pool.Query(queryCtx, query, sinceHeight, limit)
rows, err := r.pool.Query(ctx, query, sinceHeight, limit)
if err != nil {
r.logger.Error(
"Failed to query payloads since height",
Expand Down Expand Up @@ -207,11 +194,8 @@ func (r *PostgresRepository) GetPayloadByHeight(ctx context.Context, height uint
WHERE block_height = $1;
`

queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

var payload types.PayloadInfo
err := r.pool.QueryRow(queryCtx, query, height).Scan(
err := r.pool.QueryRow(ctx, query, height).Scan(
&payload.PayloadID,
&payload.ExecutionPayload,
&payload.BlockHeight,
Expand Down Expand Up @@ -249,11 +233,8 @@ func (r *PostgresRepository) GetLatestPayload(ctx context.Context) (*types.Paylo
LIMIT 1;
`

queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

var payload types.PayloadInfo
err := r.pool.QueryRow(queryCtx, query).Scan(
err := r.pool.QueryRow(ctx, query).Scan(
&payload.PayloadID,
&payload.ExecutionPayload,
&payload.BlockHeight,
Expand Down Expand Up @@ -288,11 +269,9 @@ func (r *PostgresRepository) GetLatestHeight(ctx context.Context) (uint64, error
ORDER BY block_height DESC
LIMIT 1;
`
queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

var h int64
err := r.pool.QueryRow(queryCtx, query).Scan(&h)
err := r.pool.QueryRow(ctx, query).Scan(&h)
if err != nil {
if err == sql.ErrNoRows { // Empty table -> new chain
return 0, nil
Expand Down
2 changes: 1 addition & 1 deletion cl/singlenode/singlenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (app *SingleNodeApp) produceBlock() error {

if app.payloadRepo != nil {
// Save payload to repository
saveCtx, saveCancel := context.WithTimeout(app.appCtx, 200*time.Millisecond)
saveCtx, saveCancel := context.WithTimeout(app.appCtx, 30*time.Second)
defer saveCancel()

saveStart := time.Now()
Expand Down
Loading