diff --git a/docs/build/apps/ingest-sdk/ingestion-pipeline-code.mdx b/docs/build/apps/ingest-sdk/ingestion-pipeline-code.mdx
index 39cc46ca0d..baba56d1ff 100644
--- a/docs/build/apps/ingest-sdk/ingestion-pipeline-code.mdx
+++ b/docs/build/apps/ingest-sdk/ingestion-pipeline-code.mdx
@@ -1,13 +1,16 @@
---
-title: Ingestion Pipeline Sample Code
+title: CDP Consumer Pipeline Sample Code
sidebar_position: 30
---
-Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go [Ingestion SDK](overview.mdx#the-ingestion-sdk-packages) to publish derived data to a remote message broker. Demonstrate event-driven, distributed processing with a sample microservice (Python script) as subscriber.
+Complete code for a small sample of CDP consumer pipeline of Stellar network ledger metadata using the Stellar Go [Ingest SDK](overview.mdx#the-ingestion-sdk-packages) paired with new [CDP SDK](https://github.com/stellar/go/tree/master/ingest/cdp) to demonstrate data pipeline from ledger metadata to derived data model with event-driven, distributed processing to sample microservice (Python script) as subscriber.
This example uses the ZeroMQ [goczmq](https://github.com/zeromq/goczmq) Go wrapper SDK, which requires a few o/s [dependent libraries to also be installed on the host machine](https://github.com/zeromq/goczmq?tab=readme-ov-file#dependencies).
-Put these files in a directory, compile and run with `go build -o pipeline ./.; ./pipeline`
+Steps:
+#1 - Put these files in a directory
+#2 - compile and run with `go mod tidy;go build -o pipeline ./.; ./pipeline`
+#3 - in separate terminal, run `python distributed_payment_subsciber.py`, this will perform distributed pipeline topology, as it receives messages with payment info from the pipeline process and does additional processing(printing it to console).
### `go.mod`
@@ -21,8 +24,8 @@ go 1.22
toolchain go1.22.1
require (
- github.com/stellar/go v0.0.0-20240614234001-3a31ed780c58
- github.com/zeromq/goczmq v4.1.0+incompatible
+ github.com/stellar/go v0.0.0-20241008214914-7950d4254e6a
+ github.com/zeromq/goczmq v4.1.0+incompatible
)
```
@@ -33,20 +36,24 @@ require (
```go
+
package main
import (
"context"
"encoding/json"
+ "fmt"
"io"
"log"
"os"
"os/signal"
+ "github.com/pelletier/go-toml"
"github.com/pkg/errors"
"github.com/stellar/go/amount"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
+ "github.com/stellar/go/ingest/cdp"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/datastore"
@@ -56,7 +63,7 @@ import (
"github.com/zeromq/goczmq"
)
-// Application specifics
+// Application payment model
type AppPayment struct {
Timestamp uint
BuyerAccountId string
@@ -65,9 +72,9 @@ type AppPayment struct {
Amount string
}
-// General stream topology
+// application data pipeline
type Message struct {
- Payload []byte
+ Payload interface{}
}
type Processor interface {
@@ -84,7 +91,7 @@ type ZeroMQOutboundAdapter struct {
}
func (adapter *ZeroMQOutboundAdapter) Process(ctx context.Context, msg Message) error {
- _, err := adapter.Publisher.Write(msg.Payload)
+ _, err := adapter.Publisher.Write(msg.Payload.([]byte))
return err
}
@@ -98,12 +105,7 @@ func (transformer *AppPaymentTransformer) Subscribe(receiver Processor) {
}
func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Message) error {
- ledgerCloseMeta := xdr.LedgerCloseMeta{}
- err := ledgerCloseMeta.UnmarshalBinary(msg.Payload)
- if err != nil {
- return errors.Wrapf(err, "failed to unmarshal message payload to LedgerCloseMeta")
- }
-
+ ledgerCloseMeta := msg.Payload.(xdr.LedgerCloseMeta)
ledgerTxReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(transformer.networkPassPhrase, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to create reader for ledger %v", ledgerCloseMeta.LedgerSequence())
@@ -142,109 +144,91 @@ func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Messa
return nil
}
-type CaptiveCoreInboundAdapter struct {
- TomlParams ledgerbackend.CaptiveCoreTomlParams
+type LedgerMetadataInboundAdapter struct {
processors []Processor
historyArchiveURLs []string
- coreConfigNetworkTemplate []byte
+ dataStoreConfig datastore.DataStoreConfig
}
-func (adapter *CaptiveCoreInboundAdapter) Subscribe(receiver Processor) {
+func (adapter *LedgerMetadataInboundAdapter) Subscribe(receiver Processor) {
adapter.processors = append(adapter.processors, receiver)
}
-func (adapter *CaptiveCoreInboundAdapter) Run(ctx context.Context) error {
- // Setup captive core config to use the Pubnet network
- captiveCoreToml, err := ledgerbackend.NewCaptiveCoreTomlFromData(adapter.coreConfigNetworkTemplate, adapter.TomlParams)
- if err != nil {
- return errors.Wrap(err, "error creating captive core toml")
- }
-
- captiveConfig := ledgerbackend.CaptiveCoreConfig{
- BinaryPath: adapter.TomlParams.CoreBinaryPath,
- HistoryArchiveURLs: adapter.TomlParams.HistoryArchiveURLs,
- Context: ctx,
- Toml: captiveCoreToml,
- }
-
- // Create a new captive core backend, the origin of ingestion pipeline
- captiveBackend, err := ledgerbackend.NewCaptive(captiveConfig)
- if err != nil {
- return errors.Wrap(err, "error creating captive core instance")
- }
+func (adapter *LedgerMetadataInboundAdapter) Run(ctx context.Context) error {
- // Create a client to the network's history archives
- historyAchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
+ /////////////////////////////////////////////////////////////////
+ // Note, https://github.com/stellar/go/issues/5495 will deprecate this
+ // need for this type of manual retrieval of latest ledger when
+ // use case is to initiate streaming from latest point.
+ // It proposes new mechanism to trigger ApplyLedgerMetadata() to
+ // perform automatic resolution of 'latest ledger' instead.
+ historyArchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
- UserAgent: "my_app",
+ UserAgent: "payment_demo",
Context: ctx,
},
})
-
if err != nil {
return errors.Wrap(err, "error creating history archive client")
}
+ latestNetworkLedger, err := historyArchive.GetLatestLedgerSequence()
+ /////////////////////////////////////////////////////////////////////
- // Acquire the most recent ledger on network
- latestNetworkLedger, err := datastore.GetLatestLedgerSequenceFromHistoryArchives(historyAchive)
if err != nil {
return errors.Wrap(err, "error getting latest ledger")
}
- // Tell the captive core instance to emit LedgerCloseMeta starting at
- // latest network ledger and continuing indefintely, streaming.
- if err := captiveBackend.PrepareRange(ctx, ledgerbackend.UnboundedRange(latestNetworkLedger)); err != nil {
- return errors.Wrap(err, "error preparing captive core ledger range")
- }
-
- // Run endless loop that receives LedgerCloseMeta from captive core for each new
- // ledger generated by the network and pushes it to next processors in pipeline
- for nextLedger := latestNetworkLedger; true; nextLedger++ {
- ledgerCloseMeta, err := captiveBackend.GetLedger(ctx, nextLedger)
- if err != nil {
- return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger)
- }
-
- payload, err := ledgerCloseMeta.MarshalBinary()
- if err != nil {
- return errors.Wrapf(err, "failed to encode ledger %d from xdr to binary", nextLedger)
- }
+ ledgerRange := ledgerbackend.UnboundedRange(latestNetworkLedger)
- log.Printf("Processing Ledger %v", nextLedger)
- for _, processor := range adapter.processors {
- if err := processor.Process(ctx, Message{Payload: payload}); err != nil {
- return errors.Wrapf(err, "failed to process ledger %d", nextLedger)
- }
- }
+ pubConfig := cdp.PublisherConfig{
+ DataStoreConfig: adapter.dataStoreConfig,
+ BufferedStorageConfig: cdp.DefaultBufferedStorageBackendConfig(adapter.dataStoreConfig.Schema.LedgersPerFile),
}
- return nil
+
+ fmt.Printf("beginning payments stream, starting at ledger %v ...\n", latestNetworkLedger)
+ return cdp.ApplyLedgerMetadata(ledgerRange, pubConfig, ctx,
+ func(lcm xdr.LedgerCloseMeta) error {
+ for _, processor := range adapter.processors {
+ if err = processor.Process(ctx, Message{Payload: lcm}); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
}
func main() {
+ // run a data pipeline that transforms Pubnet ledger metadata into payment events
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer stop()
- // create the inbound 'source of origin' adapter,
- // imports network data using this captive core config
- networkInboundAdapter := &CaptiveCoreInboundAdapter{
- historyArchiveURLs: network.TestNetworkhistoryArchiveURLs,
- coreConfigNetworkTemplate: ledgerbackend.TestnetDefaultConfig,
- TomlParams: ledgerbackend.CaptiveCoreTomlParams{
- NetworkPassphrase: network.TestNetworkPassphrase,
- HistoryArchiveURLs: network.TestNetworkhistoryArchiveURLs,
- UseDB: true,
- CoreBinaryPath: "stellar-core", // assumes you have installed stellar-core on your o/s PATH
- },
+ cfg, err := toml.LoadFile("config.toml")
+ if err != nil {
+ fmt.Printf("config.toml shoule be accessible in current directdory: %v\n", err)
+ return
+ }
+
+ datastoreConfig := datastore.DataStoreConfig{}
+ // Unmarshal TOML data into the Config struct
+ if err = cfg.Unmarshal(&datastoreConfig); err != nil {
+ fmt.Printf("error unmarshalling TOML config: %v\n", err)
+ return
+ }
+
+ // create the inbound source of pubnet ledger metadata
+ ledgerMetadataInboundAdapter := &LedgerMetadataInboundAdapter{
+ historyArchiveURLs: network.PublicNetworkhistoryArchiveURLs,
+ dataStoreConfig: datastoreConfig,
}
// create the app transformer to convert network data to application data model
- appTransformer := &AppPaymentTransformer{networkPassPhrase: network.TestNetworkPassphrase}
+ appTransformer := &AppPaymentTransformer{networkPassPhrase: network.PublicNetworkPassphrase}
// create the outbound adapter, this is the end point of the pipeline
// publishes application data model as messages to a broker
publisher, err := goczmq.NewPub("tcp://127.0.0.1:5555")
if err != nil {
- log.Printf("error creating 0MQ publisher: %v", err)
+ log.Printf("error creating 0MQ publisher: %v\n", err)
return
}
defer publisher.Destroy()
@@ -252,9 +236,30 @@ func main() {
// wire up the ingestion pipeline and let it run
appTransformer.Subscribe(outboundAdapter)
- networkInboundAdapter.Subscribe(appTransformer)
- log.Printf("Payment ingestion pipeline ended %v", networkInboundAdapter.Run(ctx))
+ ledgerMetadataInboundAdapter.Subscribe(appTransformer)
+ log.Printf("Payment ingestion pipeline ended %v\n", ledgerMetadataInboundAdapter.Run(ctx))
}
+
+```
+
+
+
+### `config.toml`
+
+The CDP configuration settings, this file defines the data storage which contains the pre-generated Ledger Metadata files. Google Cloud storage is used in this example.
+
+
+
+```
+type = "GCS"
+
+[params]
+destination_bucket_path = "my-gcs-bucketname/prefix1/prefix2"
+
+[schema]
+ledgers_per_file = 1
+files_per_partition = 64000
+
```
@@ -274,11 +279,11 @@ import json
context = zmq.Context()
socket = context.socket(zmq.SUB)
-print("Collecting next 10 payments from pipeline ...")
+print("Collecting payments from pipeline ...")
socket.connect("tcp://127.0.0.1:5555")
socket.subscribe("")
-for request in range(10):
+while True:
message = socket.recv()
json_object = json.loads(message)
diff --git a/docs/build/apps/ingest-sdk/overview.mdx b/docs/build/apps/ingest-sdk/overview.mdx
index bec22ec52d..714d8f2410 100644
--- a/docs/build/apps/ingest-sdk/overview.mdx
+++ b/docs/build/apps/ingest-sdk/overview.mdx
@@ -3,14 +3,14 @@ title: Overview
sidebar_position: 10
---
-This tutorial walks through how an application can leverage common streaming data patterns to ingest Stellar network transaction data using a few select packages from the Stellar Go Repo [github.com/stellar/go](https://github.com/stellar/go/blob/master/) collectively known as the 'Ingestion' SDK:
+This tutorial walks through how an application can leverage [CDP architecture](https://stellar.org/blog/developers/composable-data-platform) to create fast, lightweight Stellar Ledger Metada data pipelines using a few select packages from the Stellar Go Repo [github.com/stellar/go](https://github.com/stellar/go/blob/master/) collectively known as the 'Ingestion' SDK:
## The Ingestion SDK packages
- `github.com/stellar/go/amount` utility package to convert prices from network transaction operations to string
- `github.com/stellar/go/historyarchive` `github.com/stellar/go/support/datastore` `github.com/stellar/go/support/storage` utility package with convenient wrappers for accessing history archives, and avoid low-level http aspects
-- `github.com/stellar/go/ingest` provides parsing functionality over the network ledger data, converts to more developer-centric `LedgerTransaction` model
-- `github.com/stellar/go/ingest/ledgerbackend` provides the captive core backend implementation
+- `github.com/stellar/go/ingest` provides parsing functionality over the network ledger metadata, converts to more developer-centric `LedgerTransaction` model
+- `github.com/stellar/go/cdp` provides the CDP ledger metadata producer function for streaming pipeline integration.
- `github.com/stellar/go/network` provides convenient pre-configured settings for Testnet and Mainnet networks
- `github.com/stellar/go/xdr` a complete Golang binding to the Stellar network data model
@@ -18,14 +18,18 @@ This tutorial walks through how an application can leverage common streaming dat
### Project requirements
-To build an example streaming network ingestion pipeline from live Stellar network transaction data, you'll need:
+To use this example CDP pipeline for live Stellar network transaction data, you'll need:
- A developer workstation with [Go](https://go.dev/learn/) programming language runtime installed
- An IDE to edit Go code, [VSCode](https://code.visualstudio.com/download) is good if one is needed
- A newly initialized, empty Go project folder. `mkdir pipeline; cd pipeline; go mod init example/pipeline`
-- `stellar-core` must be [installed](../../../validators/admin-guide/installation.mdx) on your workstation and available on your o/s PATH
+- Some familiarity to the [Stellar Ledger Metadata model](../../../learn/fundamentals/stellar-data-structures/README.mdx). It is defined in an IDL format expressed in [XDR encoding](https://github.com/stellar/stellar-xdr).
+- Docker
+- Google Cloud Platform account:
+ - a bucket created in Google Cloud Storage(GCS)
+ - GCP [credentials in workstation environment](https://github.com/stellar/go/blob/master/services/galexie/README.md#set-up-gcp-credentials)
-The [Stellar network data model](../../../learn/fundamentals/stellar-data-structures/README.mdx) is defined in an IDL format expressed in [XDR encoding](https://github.com/stellar/stellar-xdr). Our example application is only interested in a small subset of the overall transaction data model related to buying and selling of assets, i.e. a payment, and defines its own data model internally:
+Our example application is only interested in a small subset of the overall network data model related to asset transfers triggered by Payment operation and defines its own derived data model as the goal of exercise:
@@ -41,22 +45,48 @@ The [Stellar network data model](../../../learn/fundamentals/stellar-data-struct
-The example application will run a [network ingestion pipeline](https://github.com/stellar/go/blob/master/ingest/doc.go) to derive a smaller `ApplicationPayment` model from the [Stellar network transaction data model](../../../learn/fundamentals/stellar-data-structures/README.mdx) as 'source of origin' and thus enable the application to avoid large compute resources that would have been required for maintaining storage of the full Stellar network data model.
+The example application will perform both of CDP pipelines. A minimum of two pipelines are required for a complete end to end CDP architecture.
-The ingestion pipeline will perform three distinct stream processor roles:
+
-### Inbound Adapter
+### Ledger Metadata Export Pipeline
-Acts as the 'source of origin' for the pipeline. Retrieves [LedgerCloseMeta](https://github.com/stellar/go/blob/f30d11432e81c7a7cbb739a694520f729bbb31dd/xdr/xdr_generated.go#L18358) generated from a Stellar network using captive core. `LedgerCloseMeta` is the top-level aggregate in the Stellar data model of which all [Stellar network transaction data](../../../learn/fundamentals/stellar-data-structures/README.mdx) is nested within. Publishes the `LedgerCloseMeta` onto the pipeline.
+This pipeline needs to be initiated first, it is responsible for exporting Stellar Ledger Metadata as files to a [CDP Datastore](https://github.com/stellar/go/blob/master/support/datastore/datastore.go#L17).
-### Transformer
+#### Determine the Datastore
-Subscribes to receive `LedgerCloseMeta` from the pipeline. Uses the Go SDK package [github.com/stellar/go/xdr](https://github.com/stellar/go/tree/master/xdr) to parse the nested network data model for payment operations and convert those into a new instance of application data model `ApplicationPayment` instances. Publishes `ApplicationPayment` to the pipeline.
+The Datastore in CDP is an interface, allowing for multiple implementations which represent different physical storage layers that can be 'plugged in' to export and consumer pipelines. Stellar provides the [GCS Datastore] as the first Datastore implementation, and this example chooses to use this existing implementation.
-### Outbound Adapter
+There will be open source contributions for implementations on other storage layers to choose from as CDP grows. If you can't find an implementation for a storage layer you would like to use, it is also possible to develop your own [ Datastore](https://github.com/stellar/go/blob/master/support/datastore/datastore.go#L17) implementation, which is beyond scope of this example, as it entails a separate learning exercise of its own, coming soon!
+
+#### Exporting network metadata to Datastore
+
+Use Galexie, a new CDP command line program for exporting network metadata to datastores.
+
+- Follow the Galexie setup steps in [Galexie User Guide](https://github.com/stellar/go/blob/master/services/galexie/README.md#setup), to configure specifics of GCS bucket and target network.
+
+- Follow the [Galexie docker runtime instructions](https://github.com/stellar/go/blob/master/services/galexie/README.md#running-galexie) to start the export.
+ - For one time export of historical bounded range of ledgers, use `append --start --end `
+ - For a continuous export of prior ledgers and all new ledgers generated on network, use `append --start `.
+
+### Ledger Metadata Consumer Pipeline
+
+A consumer pipeline retrieves files from the GCS bucket and uses them as the origin of Ledger Metadata in a data processing pipeline. There can be many separate consumer pipelines all accessing the same Datastore at stame time. Each consumer pipeline will typically perform three distinct stream processor roles:
+
+#### Inbound Adapter
+
+The 'source of origin' for the ledger metadata in a pipeline. This processor retrieves [Ledger Metadata](https://github.com/stellar/go/blob/f30d11432e81c7a7cbb739a694520f729bbb31dd/xdr/xdr_generated.go#L18358) files from the GCS Datastore, extracts the `LedgerCloseMeta` for each Ledger and publishes it onto the messaging pipeline.
+
+The go sdk provides consumer helper function [ApplyLedgerMetadata](https://github.com/stellar/go/blob/master/ingest/cdp/producer.go#L89) for automated, performant, buffered retrieval of files from the remote datastore, application code can leverage this to acquire pure `LedgerCloseMeta` data from a callback function.
+
+#### Transformer
+
+Subscribes on the pipeline to receive `LedgerCloseMeta`. Uses the Go SDK package [github.com/stellar/go/xdr](https://github.com/stellar/go/tree/master/xdr) to parse the ledger meta data model for payment operations and convert those into a new instance of application data model `AppPayment` instances. Publishes `AppPayment` to the pipeline.
+
+#### Outbound Adapter
Acts as the termination of the pipeline, it subscribes to receive `ApplicationPayment` and publishes the data off the pipeline and to an external data store, a ZeroMQ Publisher Socket, which is essentially a message broker.
### Summary
-Refer to [Ingestion Pipeline Sample Application](./ingestion-pipeline-code.mdx) for complete code demonstrating usage of the 'ingestion' SDK packages to create these adapters and transformers and run a live pipeline against the Stellar network.
+Refer to [Ingestion Pipeline Sample Application](./ingestion-pipeline-code.mdx) for complete consumer code example, demonstrating a live, streaming pipeline against the Stellar network, processing each new ledger's metadata as it is closed on the network.
diff --git a/static/assets/cdp_pipelines.png b/static/assets/cdp_pipelines.png
new file mode 100644
index 0000000000..58fef37e3b
Binary files /dev/null and b/static/assets/cdp_pipelines.png differ