This example demonstrates how to write a WebAssembly-based Function Stream Processor in Go, implementing a real-time counter (same logic as the Python example).
- Input: Kafka
input-topic - Processing: Word frequency counting with KV state
- Output: JSON format to Kafka
output-topic
- TinyGo: https://tinygo.org/getting-started/install/
- wasm-tools:
cargo install wasm-tools - Go SDK toolchain: run
make -C go-sdk envfrom project root
make -C ../../go-sdk build
./build.sh
# Output: build/processor.wasmcreate function with ('function_path'='/path/to/examples/go-processor/build/processor.wasm','config_path'='/path/to/examples/go-processor/config.yaml')Example:
create function with ('function_path'='/Users/zhenyuluo/lyy/data/git/function-stream/examples/go-processor/build/processor.wasm','config_path'='/Users/zhenyuluo/lyy/data/git/function-stream/examples/go-processor/config.yaml')show functionsstop function <function-name>Example:
stop function go-processor-exampledrop function <function-name>Example:
drop function go-processor-example# 1. Build the WASM processor
./build.sh
# 2. Start the CLI
cd ../../
./scripts/start-cli.sh
# 3. In the CLI, create and manage the function-- Create function
create function with ('function_path'='/Users/zhenyuluo/lyy/data/git/function-stream/examples/go-processor/build/processor.wasm','config_path'='/Users/zhenyuluo/lyy/data/git/function-stream/examples/go-processor/config.yaml')
-- Show all functions
show functions
-- Stop function (if running)
stop function go-processor-example
-- Drop function
drop function go-processor-exampleInput (from Kafka input-topic):
hello
world
hello
hello
world
Output (to Kafka output-topic):
{"total_processed":1,"counter_map":{"hello":1}}
{"total_processed":2,"counter_map":{"hello":1,"world":1}}
{"total_processed":3,"counter_map":{"hello":2,"world":1}}
{"total_processed":4,"counter_map":{"hello":3,"world":1}}
{"total_processed":5,"counter_map":{"hello":3,"world":2}}- Language: Go (compiled with TinyGo)
- Target: WASI P2 (WebAssembly System Interface Preview 2)
- State Management: KV store with key prefix configuration
- Lifecycle:
fs-init: Initialize state and configurationfs-process: Process each messagefs-close: Cleanup resources
The function requires a configuration file (config.yaml) that defines:
name: "go-processor-example"
type: processor
input-groups:
- inputs:
- input-type: kafka
bootstrap_servers: "localhost:9092"
topic: "input-topic"
partition: 0
group_id: "go-processor-group"
outputs:
- output-type: kafka
bootstrap_servers: "localhost:9092"
topic: "output-topic"
partition: 0| Parameter | Type | Description | Example |
|---|---|---|---|
name |
string | Unique function name | "go-processor-example" |
type |
string | Function type | processor |
input-groups |
array | Input source configurations | See above |
outputs |
array | Output sink configurations | See above |
The processor also accepts runtime configuration via fs-init:
| Parameter | Type | Description | Default |
|---|---|---|---|
key_prefix |
string | Prefix for KV store keys | "" |
┌─────────────┐
│ Kafka Source│
│ input-topic │
└──────┬──────┘
│
▼
┌─────────────────────┐
│ Go WASM Processor │
│ ┌───────────────┐ │
│ │ fs-process │ │
│ │ ┌─────────┐ │ │
│ │ │ Counter │ │ │
│ │ │ Logic │ │ │
│ │ └────┬────┘ │ │
│ │ │ │ │
│ │ ┌────▼────┐ │ │
│ │ │ KV Store│ │ │
│ │ └─────────┘ │ │
│ └───────────────┘ │
└──────────┬──────────┘
│
▼
┌─────────────┐
│ Kafka Sink │
│output-topic │
└─────────────┘
If you see errors about missing WIT files or WASI interfaces:
make -C ../../go-sdk bindings
./build.shEnsure you have the latest version of wasm-tools:
cargo install wasm-tools --forceThis example requires TinyGo 0.40.0 or later with WASI P2 support:
tinygo version
# Should show: tinygo version 0.40.0 or higherexamples/go-processor/
├── build.sh # Build script
├── main.go # Go processor implementation
├── go.mod # Go module definition
├── config.yaml # Function configuration (Kafka I/O)
└── build/ # Build output
└── processor.wasm # WASI P2 component
go-sdk/
├── Makefile # Go SDK build pipeline
├── runtime.go # SDK runtime bootstrap
├── context.go # SDK context APIs
├── store.go # SDK state store APIs
├── wit/ # Generated WIT package and dependencies
└── bindings/ # Generated Go bindings