Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 30 additions & 26 deletions explorer/lib/explorer/periodically.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule Explorer.Periodically do
require Logger
alias Phoenix.PubSub
use GenServer

Expand Down Expand Up @@ -29,6 +30,7 @@ defmodule Explorer.Periodically do
# Gets previous unverified batches and checks if they were verified
run_every_n_iterations = 8
new_count = rem(count + 1, run_every_n_iterations)

if new_count == 0 do
Task.start(&process_unverified_batches/0)
end
Expand Down Expand Up @@ -63,33 +65,35 @@ defmodule Explorer.Periodically do
{:ok, lock} ->
"Processing batch: #{batch.merkle_root}" |> IO.inspect()

{batch_changeset, proofs} =
batch
|> Utils.extract_info_from_data_pointer()
|> Batches.generate_changesets()

Batches.insert_or_update(batch_changeset, proofs)
|> case do
{:ok, _} ->
PubSub.broadcast(Explorer.PubSub, "update_views", %{
eth_usd:
case EthConverter.get_eth_price_usd() do
{:ok, eth_usd_price} -> eth_usd_price
{:error, _error} -> :empty
end
})

{:error, error} ->
IO.puts("Some error in DB operation, not broadcasting update_views")
IO.inspect(error)

# no changes in DB
nil ->
nil
with {:ok, batch_info} <- Utils.extract_info_from_data_pointer(batch),
{batch_changeset, proofs} <- Batches.generate_changesets(batch_info) do
Batches.insert_or_update(batch_changeset, proofs)
|> case do
{:ok, _} ->
PubSub.broadcast(Explorer.PubSub, "update_views", %{
eth_usd:
case EthConverter.get_eth_price_usd() do
{:ok, eth_usd_price} -> eth_usd_price
{:error, _error} -> :empty
end
})

{:error, error} ->
IO.puts("Some error in DB operation, not broadcasting update_views")
IO.inspect(error)

# no changes in DB
nil ->
nil
end

"Done processing batch: #{batch.merkle_root}" |> IO.inspect()
Mutex.release(BatchMutex, lock)
else
{:error, {:http_error, reason}} ->
Logger.error("Error when procesing request body: #{inspect(reason)}")
# Maybe delete the batch
end

"Done processing batch: #{batch.merkle_root}" |> IO.inspect()
Mutex.release(BatchMutex, lock)
end
end

Expand Down
30 changes: 16 additions & 14 deletions explorer/lib/explorer_web/live/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,16 @@ defmodule Utils do
IO.inspect("Calculating proof hashes")

batch_json
|> Enum.map(
fn s3_object ->
:crypto.hash(:sha3_256, s3_object["proof"])
end)
|> Enum.map(fn s3_object ->
:crypto.hash(:sha3_256, s3_object["proof"])
end)
end

def calculate_proof_hashes({:error, reason}) do
IO.inspect("Error calculating proof hashes: #{inspect(reason)}")
[]
end
def calculate_proof_hashes(error), do: error

def fetch_batch_data_pointer(batch_data_pointer) do
case Finch.build(:get, batch_data_pointer) |> Finch.request(Explorer.Finch) do
case Finch.build(:get, batch_data_pointer)
|> Finch.request(Explorer.Finch, request_timeout: 10_000) do
{:ok, %Finch.Response{status: 200, body: body}} ->
case Jason.decode(body) do
{:ok, json} -> {:ok, json}
Expand All @@ -157,7 +154,7 @@ defmodule Utils do
def extract_info_from_data_pointer(%BatchDB{} = batch) do
IO.inspect("Extracting batch's proofs info: #{batch.merkle_root}")
# only get from s3 if not already in DB
proof_hashes =
result =
case Proofs.get_proofs_from_batch(%{merkle_root: batch.merkle_root}) do
nil ->
IO.inspect("Fetching from S3")
Expand All @@ -169,11 +166,16 @@ defmodule Utils do
proof_hashes ->
# already processed and stored the S3 data
IO.inspect("Fetching from DB")
proof_hashes
{:ok, proof_hashes}
end

batch
|> Map.put(:proof_hashes, proof_hashes)
|> Map.put(:amount_of_proofs, proof_hashes |> Enum.count())
with {:ok, proof_hashes} <- result do
batch_info =
batch
|> Map.put(:proof_hashes, proof_hashes)
|> Map.put(:amount_of_proofs, proof_hashes |> Enum.count())

{:ok, batch_info}
end
end
end
203 changes: 203 additions & 0 deletions operator/pkg/operator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package operator_test

import (
"context"
"crypto/ecdsa"
"crypto/rand"
"fmt"
"math/big"
"net/http"
"sync"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
gethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/params"
contractAlignedLayerServiceManager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager"
"golang.org/x/net/http2"
)

var (
alicePrivateKey = func() []byte {
key, err := hexutil.Decode("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80")
if err != nil {
panic(err)
}
return key
}()
aliceAddress = func() common.Address {
addr, err := common.NewMixedcaseAddressFromString("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266")
if err != nil {
panic(err)
}
return addr.Address()
}()
alignedLayerServiceManagerAddress = func() common.Address {
addr, err := common.NewMixedcaseAddressFromString("0x1613beB3B2C4f22Ee086B2b38C1476A3cE7f78E8")
if err != nil {
panic(err)
}
return addr.Address()
}()
)

func NewAnvilClient(t *testing.T) *ethclient.Client {
client, err := ethclient.Dial("http://localhost:8545")
if err != nil {
t.Fatalf("could not connect to anvil: %s", err)
}
return client
}

type AlignedUser struct {
Signer gethtypes.Signer
Client *ethclient.Client
PrivateKey *ecdsa.PrivateKey
UserAddress common.Address
Name string
}

func NewAlignedUser(t *testing.T, privateKey []byte, client *ethclient.Client, addr common.Address, name string) *AlignedUser {
key, err := crypto.ToECDSA(privateKey)
if err != nil {
t.Fatalf("could not create private key: %s", err)
}
return &AlignedUser{
Signer: gethtypes.NewCancunSigner(big.NewInt(31337)),
Client: client,
PrivateKey: key,
UserAddress: addr,
Name: name,
}
}

func (u *AlignedUser) SendTransaction(tx *gethtypes.Transaction) error {
signedTx, err := gethtypes.SignTx(tx, u.Signer, u.PrivateKey)
if err != nil {
return err
}
return u.Client.SendTransaction(context.TODO(), signedTx)
}

func (u *AlignedUser) getNonce() *big.Int {
nonce, err := u.Client.NonceAt(context.Background(), u.UserAddress, nil)
if err != nil {
panic(err)
}
return new(big.Int).SetUint64(nonce)
}

func CreateNewTask(t *testing.T, user *AlignedUser, contractAddress common.Address, merkleRoot [32]byte, dataPointer string) {
serviceManager, err := contractAlignedLayerServiceManager.NewContractAlignedLayerServiceManager(
contractAddress,
user.Client,
)
if err != nil {
t.Fatalf("could not create service manager: %s", err)
}
t.Logf("ServiceManager created")

createTx, err := serviceManager.CreateNewTask(
&bind.TransactOpts{
From: user.UserAddress,
Nonce: user.getNonce(),
Signer: func(addr common.Address, tx *gethtypes.Transaction) (*gethtypes.Transaction, error) {
return gethtypes.SignTx(tx, user.Signer, user.PrivateKey)
},
Value: new(big.Int).SetUint64(1),
GasLimit: params.GenesisGasLimit / 2,
},
merkleRoot,
dataPointer,
)
if err != nil {
t.Fatalf("could not create task: %s", err)
}

t.Logf("New task created")

i := 0
r := new(gethtypes.Receipt)
for {
r, err = user.Client.TransactionReceipt(context.TODO(), createTx.Hash())
if i > 10 {
return
}
i++
if err != nil {
if err.Error() != "not found" {
t.Fatal(err)
}
time.Sleep(1 * time.Second)
continue
}
t.Logf("Receipt Status: %v", r.Status)
if r.Status != 0 {
break
}
time.Sleep(1 * time.Second)
}

}

func processGzipBomb(t *testing.T) {
client := NewAnvilClient(t)
alice := NewAlignedUser(t, alicePrivateKey, client, aliceAddress, "alice")
t.Logf("New user %v message", alice)

var randHash [32]byte
if _, err := rand.Read(randHash[:]); err != nil {
t.Fatalf("could not generate random hash: %s", err)
}

CreateNewTask(t, alice, alignedLayerServiceManagerAddress, randHash, fmt.Sprintf("http://localhost:1515/%x", randHash))
}

func startTestServerOOM(t *testing.T, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()

server := &http.Server{
Addr: ":1515",
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handleRequestOOM(t, w, r)
}),
}

if err := http2.ConfigureServer(server, &http2.Server{}); err != nil {
t.Fatalf("could not configure server for http2: %s", err)
}
t.Logf("Starting the server")

t.Fatal(server.ListenAndServe())
}

func handleRequestOOM(t *testing.T, w http.ResponseWriter, r *http.Request) {
t.Logf("Received request: %s %s", r.Method, r.URL)
switch r.Method {
case http.MethodHead:
case http.MethodGet:
w.Header().Set("Content-Encoding", "gzip")
for {
if _, err := w.Write([]byte("infinite content")); err != nil {
t.Logf("Finishing test")
return
}
w.(http.Flusher).Flush()
}
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}

func TestGetBatchExplorerOOM(t *testing.T) {
wg := sync.WaitGroup{}
go startTestServerOOM(t, &wg)
processGzipBomb(t)
wg.Wait()
}