Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 167 additions & 31 deletions tools/preconf-rpc/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,25 @@ package rpcserver
import (
"bytes"
"context"
"crypto/sha1"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"strings"
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
)

const (
defaultTimeout = 5 * time.Second
defaultMaxBodySize = 30 * 1024 * 1024 // 30 MB
cacheSize = 10000

CodeParseError = -32700
CodeInvalidRequest = -32600
Expand Down Expand Up @@ -59,19 +66,62 @@ type jsonRPCError struct {
Data *any `json:"data,omitempty"`
}

var cacheMethods = map[string]bool{
"eth_call": true,
"eth_getCode": true,
"eth_getStorageAt": true,
"eth_feeHistory": true,
"eth_gasPrice": true,
"eth_getLogs": true,
"net_version": true,
}

type cacheEntry struct {
until time.Time
data json.RawMessage
}

func cacheKey(method string, params []any) string {
b, _ := json.Marshal(params)
h := sha1.Sum(append([]byte(method), b...))
return string(h[:])
}

type JSONRPCServer struct {
rwLock sync.RWMutex
methods map[string]methodHandler
proxyURL string
logger *slog.Logger
rwLock sync.RWMutex
methods map[string]methodHandler
proxyURL string
httpClient *http.Client
cache *lru.Cache[string, cacheEntry]
logger *slog.Logger
}

func NewJSONRPCServer(proxyURL string, logger *slog.Logger) *JSONRPCServer {
func NewJSONRPCServer(proxyURL string, logger *slog.Logger) (*JSONRPCServer, error) {
cache, err := lru.New[string, cacheEntry](cacheSize)
if err != nil {
return nil, err
}
return &JSONRPCServer{
proxyURL: proxyURL,
methods: make(map[string]methodHandler),
logger: logger,
}
httpClient: &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
MaxIdleConns: 256,
MaxIdleConnsPerHost: 256,
IdleConnTimeout: 90 * time.Second,
ForceAttemptHTTP2: true,
DialContext: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 5 * time.Second,
},
Timeout: 15 * time.Second,
},
cache: cache,
logger: logger,
}, nil
}

func (s *JSONRPCServer) RegisterHandler(method string, handler methodHandler) {
Expand Down Expand Up @@ -118,14 +168,53 @@ func (s *JSONRPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {

start := time.Now()
defer func() {
s.logger.Info("Request processing time", "method", req.Method, "id", req.ID, "duration", time.Since(start))
s.logger.Debug("Request processing time", "method", req.Method, "id", req.ID, "duration", time.Since(start))
}()

if cacheMethods[req.Method] {
if stubbed, resp := maybeStubERC20Meta(req.Method, req.Params); stubbed {
s.writeResponse(w, req.ID, &resp)
return
}
key := cacheKey(req.Method, req.Params)
if entry, ok := s.cache.Get(key); ok && time.Now().Before(entry.until) {
s.logger.Debug("Cache hit", "method", req.Method, "id", req.ID)
s.writeResponse(w, req.ID, &entry.data)
return
}
}

handleProxy := func() {
out, statusCode, err := s.proxyRequest(r.Context(), body)
if err != nil {
http.Error(w, err.Error(), statusCode)
return
}
var resp jsonRPCResponse
if err := json.Unmarshal(out, &resp); err != nil {
http.Error(w, "Failed to parse proxy response", http.StatusInternalServerError)
return
}
if resp.Error != nil {
s.writeError(w, req.ID, resp.Error.Code, resp.Error.Message)
return
}
if cacheMethods[req.Method] && resp.Result != nil {
key := cacheKey(req.Method, req.Params)
s.cache.Add(key, cacheEntry{
until: time.Now().Add(pickTTL(req.Method, *resp.Result)),
data: *resp.Result,
})
s.logger.Debug("Cache store", "method", req.Method, "id", req.ID)
}
s.writeResponse(w, req.ID, resp.Result)
}

s.rwLock.RLock()
handler, ok := s.methods[req.Method]
s.rwLock.RUnlock()
if !ok {
s.proxyRequest(w, body)
handleProxy()
return
}

Expand All @@ -141,7 +230,7 @@ func (s *JSONRPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.writeError(w, req.ID, CodeCustomError, err.Error())
return
case proxy:
s.proxyRequest(w, body)
handleProxy()
return
case resp == nil:
s.writeError(w, req.ID, CodeCustomError, "No response")
Expand Down Expand Up @@ -187,44 +276,91 @@ func (s *JSONRPCServer) writeError(w http.ResponseWriter, id any, code int, mess
}
}

func (s *JSONRPCServer) proxyRequest(w http.ResponseWriter, body []byte) {
client := &http.Client{
Timeout: defaultTimeout,
}
req, err := http.NewRequest(http.MethodPost, s.proxyURL, bytes.NewReader(body))
func (s *JSONRPCServer) proxyRequest(ctx context.Context, body []byte) ([]byte, int, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.proxyURL, bytes.NewReader(body))
if err != nil {
http.Error(w, "Failed to create proxy request", http.StatusInternalServerError)
return
return nil, http.StatusInternalServerError, fmt.Errorf("failed to create proxy request: %w", err)
}
req.Header.Set("Content-Type", "application/json")

s.logger.Debug("Proxying request", "url", s.proxyURL, "body", string(body))
resp, err := client.Do(req)
resp, err := s.httpClient.Do(req)
if err != nil {
http.Error(w, "Failed to execute proxy request", http.StatusInternalServerError)
return
return nil, http.StatusInternalServerError, fmt.Errorf("proxy request failed: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()

setCorsHeaders(w)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(resp.StatusCode)
rdr := io.LimitReader(resp.Body, defaultMaxBodySize)
n, err := io.Copy(w, rdr)
if err != nil {
http.Error(w, "Failed to copy proxy response", http.StatusInternalServerError)
return
if resp.StatusCode != http.StatusOK {
return nil, resp.StatusCode, fmt.Errorf("proxy request returned status %d", resp.StatusCode)
}
if n == 0 {
http.Error(w, "Empty response from proxy", http.StatusInternalServerError)
return

out, err := io.ReadAll(io.LimitReader(resp.Body, defaultMaxBodySize))
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("failed to read proxy response: %w", err)
}

return out, resp.StatusCode, nil
}

func setCorsHeaders(w http.ResponseWriter) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
}

// short-circuit a few very common ERC-20 metadata calls
// selectors: symbol 0x95d89b41, decimals 0x313ce567, name 0x06fdde03
func maybeStubERC20Meta(method string, params []any) (stubbed bool, resp json.RawMessage) {
if method != "eth_call" || len(params) == 0 {
return false, nil
}
// parse minimal: [{"to":"0x..","data":"0x...."}, <blockTag?>]
callObj, ok := params[0].(map[string]any)
if !ok {
return false, nil
}
data, _ := callObj["data"].(string)
switch strings.ToLower(data) {
case "0x95d89b41": // symbol()
// return ABI-encoded string "TOKEN"
enc := "0x" +
"0000000000000000000000000000000000000000000000000000000000000020" + // offset
"0000000000000000000000000000000000000000000000000000000000000005" + // len
"544f4b454e000000000000000000000000000000000000000000000000000000" // "TOKEN"
return true, json.RawMessage(`"` + enc + `"`)
case "0x313ce567": // decimals()
enc := "0x" + "0000000000000000000000000000000000000000000000000000000000000012" // 18
return true, json.RawMessage(`"` + enc + `"`)
case "0x06fdde03": // name()
enc := "0x" +
"0000000000000000000000000000000000000000000000000000000000000020" +
"0000000000000000000000000000000000000000000000000000000000000005" +
"546f6b656e000000000000000000000000000000000000000000000000000000"
return true, json.RawMessage(`"` + enc + `"`)
default:
return false, nil
}
}

func pickTTL(method string, params json.RawMessage) time.Duration {
switch method {
case "net_version":
return 24 * time.Hour
case "eth_getCode":
return 24 * time.Hour
case "eth_feeHistory":
return 3 * time.Second
case "eth_call":
// if block tag provided and hex number → immutable
if strings.HasSuffix(string(params), "\"") { // cheap check
if strings.Contains(string(params), "\"0x") && !strings.Contains(string(params), "\"latest\"") {
return 24 * time.Hour
}
}
return 1 * time.Second
default:
Comment thread
aloknerurkar marked this conversation as resolved.
return 2 * time.Second
}
}
12 changes: 6 additions & 6 deletions tools/preconf-rpc/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ const (
)

const (
blockTime = 12 // seconds, typical Ethereum block time
bidTimeout = 3 * time.Second // timeout for bid operations
defaultConfidence = 90 // default confidence level for the next block
confidenceSecondAttempt = 95 // confidence level for the second attempt
confidenceSubsequentAttempts = 99 // confidence level for subsequent attempts
transactionTimeout = 10 * time.Minute // timeout for transaction processing
blockTime = 12 // seconds, typical Ethereum block time
bidTimeout = 100 * time.Millisecond // timeout for bid operations
defaultConfidence = 90 // default confidence level for the next block
confidenceSecondAttempt = 95 // confidence level for the second attempt
confidenceSubsequentAttempts = 99 // confidence level for subsequent attempts
transactionTimeout = 10 * time.Minute // timeout for transaction processing
)

var (
Expand Down
5 changes: 4 additions & 1 deletion tools/preconf-rpc/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,13 @@ func New(config *Config) (*Service, error) {
healthChecker.Register(health.CloseChannelHealthCheck("BidderService", bidderDone))
s.closers = append(s.closers, channelCloser(bidderDone))

rpcServer := rpcserver.NewJSONRPCServer(
rpcServer, err := rpcserver.NewJSONRPCServer(
config.L1RPCUrls[0],
config.Logger.With("module", "rpcserver"),
)
if err != nil {
return nil, fmt.Errorf("failed to create RPC server: %w", err)
}

bidpricer, err := pricer.NewPricer(config.PricerAPIKey, config.Logger.With("module", "bidpricer"))
if err != nil {
Expand Down
Loading