Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/gateway-conformance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:
kuboNodeMultiaddr=$(ipfs --api=/ip4/127.0.0.1/tcp/5001 swarm addrs local --id | head -n 1)

# run gw
./rainbow --routing=http://127.0.0.1:8080 --peering=$kuboNodeMultiaddr &
./rainbow --http-routers=http://127.0.0.1:8080 --dht-routing=off --peering=$kuboNodeMultiaddr &
working-directory: rainbow

# 6. Run the gateway-conformance tests
Expand Down
2 changes: 2 additions & 0 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
func mustTestNode(t *testing.T, cfg Config) *Node {
cfg.DataDir = t.TempDir()
cfg.BlockstoreType = "flatfs"
cfg.DHTRouting = DHTStandard
cfg.RoutingV1Endpoints = []string{cidContactEndpoint}

ctx := context.Background()

Expand Down
81 changes: 43 additions & 38 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,31 @@ Generate an identity seed and launch a gateway:
Name: "seed",
Value: "",
EnvVars: []string{"RAINBOW_SEED"},
Usage: "Seed to derive peerID from. Generate with gen-seed. Needs --seed-index. Best to use $CREDENTIALS_DIRECTORY/seed or $RAINBOW_DATADIR/seed.",
Usage: "Seed to derive peerID from. Generate with gen-seed. Needs --seed-index. Best to use $CREDENTIALS_DIRECTORY/seed or $RAINBOW_DATADIR/seed",
},
&cli.IntFlag{
Name: "seed-index",
Value: -1,
EnvVars: []string{"RAINBOW_SEED_INDEX"},
Usage: "Index to derivate the peerID (needs --seed)",
},
&cli.StringFlag{
&cli.StringSliceFlag{
Name: "gateway-domains",
Value: "",
Value: cli.NewStringSlice(),
EnvVars: []string{"RAINBOW_GATEWAY_DOMAINS"},
Usage: "Domains with flat path gateway, no Origin isolation. Comma-separated list.",
Usage: "Domains with flat path gateway, no Origin isolation (comma-separated)",
},
&cli.StringFlag{
&cli.StringSliceFlag{
Name: "subdomain-gateway-domains",
Value: "",
Value: cli.NewStringSlice(),
EnvVars: []string{"RAINBOW_SUBDOMAIN_GATEWAY_DOMAINS"},
Usage: "Domains with subdomain-based Origin isolation. Comma-separated list.",
Usage: "Domains with subdomain-based Origin isolation (comma-separated)",
},
&cli.StringFlag{
&cli.StringSliceFlag{
Name: "trustless-gateway-domains",
Value: "",
Value: cli.NewStringSlice(),
EnvVars: []string{"RAINBOW_TRUSTLESS_GATEWAY_DOMAINS"},
Usage: "Domains limited to trustless, verifiable response types. Comma-separated list.",
Usage: "Domains limited to trustless, verifiable response types (comma-separated)",
},
&cli.StringFlag{
Name: "gateway-listen-address",
Expand All @@ -123,13 +123,13 @@ Generate an identity seed and launch a gateway:
Name: "gc-interval",
Value: time.Minute * 60,
EnvVars: []string{"RAINBOW_GC_INTERVAL"},
Usage: "The interval between automatic GC runs. Set 0 to disable.",
Usage: "The interval between automatic GC runs. Set 0 to disable",
},
&cli.Float64Flag{
Name: "gc-threshold",
Value: 0.3,
EnvVars: []string{"RAINBOW_GC_THRESHOLD"},
Usage: "Percentage of how much of the disk free space must be available.",
Usage: "Percentage of how much of the disk free space must be available",
},
&cli.IntFlag{
Name: "connmgr-low",
Expand Down Expand Up @@ -167,26 +167,41 @@ Generate an identity seed and launch a gateway:
EnvVars: []string{"RAINBOW_MAX_FD"},
Usage: "Maximum number of file descriptors. Defaults to 50% of the process' limit",
},
&cli.StringSliceFlag{
Name: "http-routers",
Value: cli.NewStringSlice(cidContactEndpoint),
EnvVars: []string{"RAINBOW_HTTP_ROUTERS"},
Usage: "HTTP servers with /routing/v1 endpoints to use for delegated routing (comma-separated)",
},
&cli.StringFlag{
Name: "routing",
Value: "",
Usage: "RoutingV1 Endpoint (otherwise Amino DHT and cid.contact is used)",
Name: "dht-routing",
Value: "accelerated",
EnvVars: []string{"RAINBOW_DHT_ROUTING"},
Usage: "Use the Amino DHT for routing. Options are 'accelerated', 'standard' and 'off'",
Action: func(ctx *cli.Context, s string) error {
switch DHTRouting(s) {
case DHTAccelerated, DHTStandard, DHTOff:
return nil
default:
return errors.New("invalid value for --dht-routing: use 'accelerated', 'standard' or 'off'")
}
},
},
&cli.BoolFlag{
Name: "dht-share-host",
Name: "dht-shared-host",
Value: false,
EnvVars: []string{"RAINBOW_DHT_SHARED_HOST"},
Usage: "If false, DHT operations are run using an ephemeral peer, separate from the main one",
},
&cli.StringFlag{
&cli.StringSliceFlag{
Name: "denylists",
Value: "",
Value: cli.NewStringSlice(),
EnvVars: []string{"RAINBOW_DENYLISTS"},
Usage: "Denylist HTTP subscriptions (comma-separated). Must be append-only denylists",
},
&cli.StringFlag{
&cli.StringSliceFlag{
Name: "peering",
Value: "",
Value: cli.NewStringSlice(),
EnvVars: []string{"RAINBOW_PEERING"},
Usage: "Multiaddresses of peers to stay connected to (comma-separated)",
},
Expand All @@ -200,7 +215,7 @@ Generate an identity seed and launch a gateway:
Name: "ipns-max-cache-ttl",
Value: 0,
EnvVars: []string{"RAINBOW_IPNS_MAX_CACHE_TTL"},
Usage: "Optional cap on caching duration for IPNS/DNSLink lookups. Set to 0 to respect original TTLs.",
Usage: "Optional cap on caching duration for IPNS/DNSLink lookups. Set to 0 to respect original TTLs",
},
}

Expand Down Expand Up @@ -277,7 +292,7 @@ share the same seed as long as the indexes are different.
}

var peeringAddrs []peer.AddrInfo
for _, maStr := range getCommaSeparatedList(cctx.String("peering")) {
for _, maStr := range cctx.StringSlice("peering") {
ai, err := peer.AddrInfoFromString(maStr)
if err != nil {
return err
Expand All @@ -288,19 +303,20 @@ share the same seed as long as the indexes are different.
cfg := Config{
DataDir: ddir,
BlockstoreType: cctx.String("blockstore"),
GatewayDomains: getCommaSeparatedList(cctx.String("gateway-domains")),
SubdomainGatewayDomains: getCommaSeparatedList(cctx.String("subdomain-gateway-domains")),
TrustlessGatewayDomains: getCommaSeparatedList(cctx.String("trustless-gateway-domains")),
GatewayDomains: cctx.StringSlice("gateway-domains"),
SubdomainGatewayDomains: cctx.StringSlice("subdomain-gateway-domains"),
TrustlessGatewayDomains: cctx.StringSlice("trustless-gateway-domains"),
ConnMgrLow: cctx.Int("connmgr-low"),
ConnMgrHi: cctx.Int("connmgr-high"),
ConnMgrGrace: cctx.Duration("connmgr-grace"),
MaxMemory: cctx.Uint64("max-memory"),
MaxFD: cctx.Int("max-fd"),
InMemBlockCache: cctx.Int64("inmem-block-cache"),
RoutingV1: cctx.String("routing"),
RoutingV1Endpoints: cctx.StringSlice("http-routers"),
DHTRouting: DHTRouting(cctx.String("dht-routing")),
DHTSharedHost: cctx.Bool("dht-shared-host"),
IpnsMaxCacheTTL: cctx.Duration("ipns-max-cache-ttl"),
DenylistSubs: getCommaSeparatedList(cctx.String("denylists")),
DenylistSubs: cctx.StringSlice("denylists"),
Peering: peeringAddrs,
GCInterval: cctx.Duration("gc-interval"),
GCThreshold: cctx.Float64("gc-threshold"),
Expand Down Expand Up @@ -463,17 +479,6 @@ func writeAllGoroutineStacks(w io.Writer) error {
return err
}

func getCommaSeparatedList(val string) []string {
if val == "" {
return nil
}
items := strings.Split(val, ",")
for i, item := range items {
items[i] = strings.TrimSpace(item)
}
return items
}

func printIfListConfigured(message string, list []string) {
if len(list) > 0 {
fmt.Printf(message+"%v\n", strings.Join(list, ", "))
Expand Down
114 changes: 66 additions & 48 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,15 @@ func init() {
}
}

const ipniFallbackEndpoint = "https://cid.contact"
const cidContactEndpoint = "https://cid.contact"

type DHTRouting string

const (
DHTAccelerated DHTRouting = "accelerated"
DHTStandard DHTRouting = "standard"
DHTOff DHTRouting = "off"
)

type Node struct {
vs routing.ValueStore
Expand Down Expand Up @@ -97,7 +105,8 @@ type Config struct {
GatewayDomains []string
SubdomainGatewayDomains []string
TrustlessGatewayDomains []string
RoutingV1 string
RoutingV1Endpoints []string
DHTRouting DHTRouting
DHTSharedHost bool
IpnsMaxCacheTTL time.Duration

Expand Down Expand Up @@ -176,9 +185,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
)
blkst = blockstore.NewIdStore(blkst)

var pr routing.PeerRouting
var vs routing.ValueStore
var cr routing.ContentRouting
var router routing.Routing

// Increase per-host connection pool since we are making lots of concurrent requests.
httpClient := &http.Client{
Expand All @@ -201,17 +208,21 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
}

opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
if cfg.RoutingV1 != "" {
routingClient, err := delegatedHTTPContentRouter(cfg.RoutingV1, routingv1client.WithStreamResultsRequired(), routingv1client.WithHTTPClient(httpClient))
var routingV1Routers []routing.Routing
for _, endpoint := range cfg.RoutingV1Endpoints {
rv1Opts := []routingv1client.Option{routingv1client.WithHTTPClient(httpClient)}
if endpoint != cidContactEndpoint {
rv1Opts = append(rv1Opts, routingv1client.WithStreamResultsRequired())
}
httpClient, err := delegatedHTTPContentRouter(endpoint, rv1Opts...)
if err != nil {
return nil, err
}
pr = routingClient
vs = routingClient
cr = routingClient
} else {
// If there are no delegated routing endpoints run an accelerated Amino DHT client and send IPNI requests to cid.contact
routingV1Routers = append(routingV1Routers, httpClient)
}

var dhtRouter routing.Routing
if cfg.DHTRouting != DHTOff {
var dhtHost host.Host
if cfg.DHTSharedHost {
dhtHost = h
Expand All @@ -237,54 +248,61 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
return nil, err
}

fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{KeyBook: h.Peerstore()},
}),
dht.Datastore(ds),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
dht.BucketSize(20),
))
if err != nil {
return nil, err
if cfg.DHTRouting == DHTAccelerated {
fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{KeyBook: h.Peerstore()},
}),
dht.Datastore(ds),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
dht.BucketSize(20),
))
if err != nil {
return nil, err
}
dhtRouter = &bundledDHT{
standard: standardClient,
fullRT: fullRTClient,
}
} else {
dhtRouter = standardClient
}
}

dhtRouter := &bundledDHT{
standard: standardClient,
fullRT: fullRTClient,
}
if len(routingV1Routers) == 0 && dhtRouter == nil {
return nil, errors.New("no routers configured: enable dht and/or configure /routing/v1 http endpoint")
}

// we want to also use the default HTTP routers, so wrap the FullRT client
// in a parallel router that calls them in parallel
httpRouters, err := delegatedHTTPContentRouter(ipniFallbackEndpoint, routingv1client.WithHTTPClient(httpClient))
if err != nil {
return nil, err
}
routers := []*routinghelpers.ParallelRouter{
{
if len(routingV1Routers) == 0 {
router = dhtRouter
} else {
var routers []*routinghelpers.ParallelRouter

if dhtRouter != nil {
routers = append(routers, &routinghelpers.ParallelRouter{
Router: dhtRouter,
ExecuteAfter: 0,
DoNotWaitForSearchValue: true,
IgnoreError: false,
},
{
})
}

for _, routingV1Router := range routingV1Routers {
routers = append(routers, &routinghelpers.ParallelRouter{
Timeout: 15 * time.Second,
Router: httpRouters,
Router: routingV1Router,
ExecuteAfter: 0,
DoNotWaitForSearchValue: true,
IgnoreError: true,
},
})
}
router := routinghelpers.NewComposableParallel(routers)

pr = router
vs = router
cr = router
router = routinghelpers.NewComposableParallel(routers)
}

return pr, nil
return router, nil
}))
h, err := libp2p.New(opts...)
if err != nil {
Expand All @@ -302,7 +320,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
}

bsctx := metri.CtxScope(ctx, "ipfs_bitswap")
bn := bsnet.NewFromIpfsHost(h, cr)
bn := bsnet.NewFromIpfsHost(h, router)
bswap := bsclient.New(bsctx, bn, blkst,
// default is 1 minute to search for a random live-want (1
// CID). I think we want to search for random live-wants more
Expand Down Expand Up @@ -357,7 +375,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
if cfg.IpnsMaxCacheTTL > 0 {
nsOptions = append(nsOptions, namesys.WithMaxCacheTTL(cfg.IpnsMaxCacheTTL))
}
ns, err := namesys.NewNameSystem(vs, nsOptions...)
ns, err := namesys.NewNameSystem(router, nsOptions...)
if err != nil {
return nil, err
}
Expand All @@ -376,7 +394,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
datastore: ds,
bsClient: bswap,
ns: ns,
vs: vs,
vs: router,
bsrv: bsrv,
resolver: r,
bwc: bwc,
Expand Down