Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 83 additions & 17 deletions cmd/shisui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"fmt"
"net"
"net/http"
"os/signal"
"path"
"slices"
"strings"
"syscall"

"os"

Expand Down Expand Up @@ -45,6 +47,14 @@ type Config struct {
Networks []string
}

type Client struct {
DiscV5API *discover.DiscV5API
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DiscV5API is not needed to close. In fact, all network use the same UDPv5, every network will stop the discv5 when call the Stop method

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since DiscV5 not managed by sub networks, I would like to suggest we don't close discV5 in sub networks close function and close by the outside management object.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to move the closure of DiskV5 from PortalProtocol, where it was originally, to main, but it broke a test and may potentially break other parts of the code that calls PortalProtocol.Close().

DiscV5 close uses a sync.Once struct and can safely be closed many times:

func (t *UDPv5) Close() {

HistoryNetwork *history.HistoryNetwork
BeaconNetwork *beacon.BeaconNetwork
StateNetwork *state.StateNetwork
Server *http.Server
}

var app = flags.NewApp("the go-portal-network command line interface")

var (
Expand Down Expand Up @@ -85,6 +95,9 @@ func shisui(ctx *cli.Context) error {

setDefaultLogger(*config)

clientChan := make(chan *Client, 1)
go handlerInterrupt(clientChan)

addr, err := net.ResolveUDPAddr("udp", config.Protocol.ListenAddr)
if err != nil {
return err
Expand All @@ -94,7 +107,7 @@ func shisui(ctx *cli.Context) error {
return err
}

return startPortalRpcServer(*config, conn, config.RpcAddr)
return startPortalRpcServer(*config, conn, config.RpcAddr, clientChan)
}

func setDefaultLogger(config Config) {
Expand All @@ -105,7 +118,51 @@ func setDefaultLogger(config Config) {
log.SetDefault(defaultLogger)
}

func startPortalRpcServer(config Config, conn discover.UDPConn, addr string) error {
func handlerInterrupt(clientChan <-chan *Client) {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(interrupt)

<-interrupt
log.Warn("Closing Shisui gracefully (type CTRL-C again to force quit)")

go func() {
if len(clientChan) == 0 {
log.Warn("Waiting for the client to start...")
}
c := <-clientChan
c.closePortalRpcServer()
}()

<-interrupt
os.Exit(1)
}

func (cli *Client) closePortalRpcServer() {
if cli.HistoryNetwork != nil {
log.Info("Closing history network...")
cli.HistoryNetwork.Stop()
}
if cli.BeaconNetwork != nil {
log.Info("Closing beacon network...")
cli.BeaconNetwork.Stop()
}
if cli.StateNetwork != nil {
log.Info("Closing state network...")
cli.StateNetwork.Stop()
}
log.Info("Closing Database...")
cli.DiscV5API.DiscV5.LocalNode().Database().Close()
log.Info("Closing UDPv5 protocol...")
cli.DiscV5API.DiscV5.Close()
log.Info("Closing servers...")
cli.Server.Close()
os.Exit(1)
}

func startPortalRpcServer(config Config, conn discover.UDPConn, addr string, clientChan chan<- *Client) error {
client := &Client{}

discV5, localNode, err := initDiscV5(config, conn)
if err != nil {
return err
Expand All @@ -117,6 +174,7 @@ func startPortalRpcServer(config Config, conn discover.UDPConn, addr string) err
if err != nil {
return err
}
client.DiscV5API = discV5API

api := &web3.API{}
err = server.RegisterName("web3", api)
Expand All @@ -130,20 +188,25 @@ func startPortalRpcServer(config Config, conn discover.UDPConn, addr string) err
if err != nil {
return err
}
client.HistoryNetwork = historyNetwork
}

var beaconNetwork *beacon.BeaconNetwork
if slices.Contains(config.Networks, portalwire.Beacon.Name()) {
err = initBeacon(config, server, conn, localNode, discV5)
beaconNetwork, err = initBeacon(config, server, conn, localNode, discV5)
if err != nil {
return err
}
client.BeaconNetwork = beaconNetwork
}

var stateNetwork *state.StateNetwork
if slices.Contains(config.Networks, portalwire.State.Name()) {
err = initState(config, server, conn, localNode, discV5)
stateNetwork, err = initState(config, server, conn, localNode, discV5)
if err != nil {
return err
}
client.StateNetwork = stateNetwork
}

ethapi := &ethapi.API{
Expand All @@ -160,6 +223,9 @@ func startPortalRpcServer(config Config, conn discover.UDPConn, addr string) err
Addr: addr,
Handler: server,
}
client.Server = httpServer

clientChan <- client
return httpServer.ListenAndServe()
}

Expand Down Expand Up @@ -223,15 +289,15 @@ func initHistory(config Config, server *rpc.Server, conn discover.UDPConn, local
return historyNetwork, historyNetwork.Start()
}

func initBeacon(config Config, server *rpc.Server, conn discover.UDPConn, localNode *enode.LocalNode, discV5 *discover.UDPv5) error {
func initBeacon(config Config, server *rpc.Server, conn discover.UDPConn, localNode *enode.LocalNode, discV5 *discover.UDPv5) (*beacon.BeaconNetwork, error) {
dbPath := path.Join(config.DataDir, "beacon")
err := os.MkdirAll(dbPath, 0755)
if err != nil {
return err
return nil, err
}
sqlDb, err := sql.Open("sqlite3", path.Join(dbPath, "beacon.sqlite"))
if err != nil {
return err
return nil, err
}

contentStorage, err := beacon.NewBeaconStorage(storage.PortalStorageConfig{
Expand All @@ -241,32 +307,32 @@ func initBeacon(config Config, server *rpc.Server, conn discover.UDPConn, localN
Spec: configs.Mainnet,
})
if err != nil {
return err
return nil, err
}
contentQueue := make(chan *discover.ContentElement, 50)

protocol, err := discover.NewPortalProtocol(config.Protocol, portalwire.Beacon, config.PrivateKey, conn, localNode, discV5, contentStorage, contentQueue)

if err != nil {
return err
return nil, err
}
portalApi := discover.NewPortalAPI(protocol)

beaconAPI := beacon.NewBeaconNetworkAPI(portalApi)
err = server.RegisterName("portal", beaconAPI)
if err != nil {
return err
return nil, err
}

beaconNetwork := beacon.NewBeaconNetwork(protocol)
return beaconNetwork.Start()
return beaconNetwork, beaconNetwork.Start()
}

func initState(config Config, server *rpc.Server, conn discover.UDPConn, localNode *enode.LocalNode, discV5 *discover.UDPv5) error {
func initState(config Config, server *rpc.Server, conn discover.UDPConn, localNode *enode.LocalNode, discV5 *discover.UDPv5) (*state.StateNetwork, error) {
networkName := portalwire.State.Name()
db, err := history.NewDB(config.DataDir, networkName)
if err != nil {
return err
return nil, err
}
contentStorage, err := history.NewHistoryStorage(storage.PortalStorageConfig{
StorageCapacityMB: config.DataCapacity,
Expand All @@ -275,24 +341,24 @@ func initState(config Config, server *rpc.Server, conn discover.UDPConn, localNo
NetworkName: networkName,
})
if err != nil {
return err
return nil, err
}
contentQueue := make(chan *discover.ContentElement, 50)

protocol, err := discover.NewPortalProtocol(config.Protocol, portalwire.State, config.PrivateKey, conn, localNode, discV5, contentStorage, contentQueue)

if err != nil {
return err
return nil, err
}
api := discover.NewPortalAPI(protocol)
stateNetworkAPI := state.NewStateNetworkAPI(api)
err = server.RegisterName("portal", stateNetworkAPI)
if err != nil {
return err
return nil, err
}
client := rpc.DialInProc(server)
historyNetwork := state.NewStateNetwork(protocol, client)
return historyNetwork.Start()
return historyNetwork, historyNetwork.Start()
}

func getPortalConfig(ctx *cli.Context) (*Config, error) {
Expand Down