Skip to content

Latest commit

 

History

History
337 lines (255 loc) · 12.3 KB

File metadata and controls

337 lines (255 loc) · 12.3 KB

Go SDK Guide

Function Stream provides a WebAssembly (WASI P2) based operator SDK for Go developers. Go code is compiled to a WASM component with TinyGo and runs in the server sandbox with KV state storage and data emission support.


1. SDK Core Components

Component Role Description
fssdk Main package Public entry point; exposes Driver, Context, Store, and Run(driver).
api Interface definitions Defines Driver, Context, Store, Iterator, ComplexKey, and error codes.
impl Runtime implementation Bridges the Driver to the WASM host (processor WIT); internal use.
bindings WIT bindings Go code generated by wit-bindgen-go from wit/processor.wit; used by impl.

Operators only depend on fssdk: implement Driver (or embed BaseDriver) and call fssdk.Run(&YourProcessor{}) from init().


2. Driver Interface and BaseDriver

2.1 Driver Interface

All Go operators must implement the fssdk.Driver interface. The runtime invokes the following methods at the appropriate times:

Method When invoked Description
Init(ctx, config) Once when the function starts Initialize state, obtain Store, parse config.
Process(ctx, sourceID, data) For each incoming message Core logic: compute, state read/write, ctx.Emit().
ProcessWatermark(ctx, sourceID, watermark) On watermark events Handle time-based windows or out-of-order logic; may forward via EmitWatermark.
TakeCheckpoint(ctx, checkpointID) When the system backs up state Persist extra in-memory state for strong consistency.
CheckHeartbeat(ctx) Periodic health check Return false to trigger operator restart.
Close(ctx) When the function shuts down Release resources and clear references.
Exec(ctx, className, modules) Optional extension Dynamic module loading, etc.; default no-op.
Custom(ctx, payload) Optional custom RPC Request/response custom bytes; default returns a copy of payload.

2.2 BaseDriver

fssdk.BaseDriver provides no-op implementations for all of the above. Embed it and override only the methods you need:

type MyProcessor struct {
    fssdk.BaseDriver
    store fssdk.Store
}

func (p *MyProcessor) Init(ctx fssdk.Context, config map[string]string) error {
    store, err := ctx.GetOrCreateStore("my-store")
    if err != nil {
        return err
    }
    p.store = store
    return nil
}

func (p *MyProcessor) Process(ctx fssdk.Context, sourceID uint32, data []byte) error {
    // Your logic; read/write p.store, then ctx.Emit(...)
    return ctx.Emit(0, result)
}

2.3 Registration Entry Point

Call fssdk.Run from init() with your Driver instance (typically a singleton):

func init() {
    fssdk.Run(&MyProcessor{})
}

3. Context and Store

3.1 Context

fssdk.Context is passed into Init, Process, ProcessWatermark, etc. It provides:

Method Description
Emit(targetID uint32, data []byte) error Send data to the given output channel.
EmitWatermark(targetID uint32, watermark uint64) error Emit a watermark.
GetOrCreateStore(name string) (Store, error) Get or create a KV Store by name (RocksDB-backed).
Config() map[string]string Startup configuration (e.g. from config.yaml init section).
Close() error Close the context; usually managed by the runtime.

3.2 Store (KV State)

fssdk.Store provides key-value and complex-key operations:

Basic API:

  • PutState(key, value []byte) error
  • GetState(key []byte) (value []byte, found bool, err error)
  • DeleteState(key []byte) error
  • ListStates(startInclusive, endExclusive []byte) ([][]byte, error) — list keys in byte order range.

ComplexKey (multi-dimensional keys / prefix scan):

  • Put(key ComplexKey, value []byte) / Get / Delete / Merge / DeletePrefix
  • ListComplex(...) — list UserKeys by keyGroup, key, namespace and range.
  • ScanComplex(keyGroup, key, namespace []byte) (Iterator, error) — returns an iterator for large scans.

ComplexKey holds KeyGroup, Key, Namespace, and UserKey for multi-dimensional indexing and prefix queries.

3.3 Iterator

Store.ScanComplex returns an fssdk.Iterator:

  • HasNext() (bool, error)
  • Next() (key, value []byte, ok bool, err error)
  • Close() error — must be called when done to release resources.

4. Production Example (Word Count)

package main

import (
    "encoding/json"
    fssdk "github.com/functionstream/function-stream/go-sdk"
    "strconv"
    "strings"
)

func init() {
    fssdk.Run(&CounterProcessor{})
}

type CounterProcessor struct {
    fssdk.BaseDriver
    store          fssdk.Store
    counterMap     map[string]int64
    totalProcessed int64
    keyPrefix      string
}

func (p *CounterProcessor) Init(ctx fssdk.Context, config map[string]string) error {
    store, err := ctx.GetOrCreateStore("counter-store")
    if err != nil {
        return err
    }
    p.store = store
    p.counterMap = make(map[string]int64)
    p.totalProcessed = 0
    p.keyPrefix = strings.TrimSpace(config["key_prefix"])
    return nil
}

func (p *CounterProcessor) Process(ctx fssdk.Context, sourceID uint32, data []byte) error {
    inputStr := strings.TrimSpace(string(data))
    if inputStr == "" {
        return nil
    }
    p.totalProcessed++

    fullKey := p.keyPrefix + inputStr
    existing, found, err := p.store.GetState([]byte(fullKey))
    if err != nil {
        return err
    }
    currentCount := int64(0)
    if found {
        if n, e := strconv.ParseInt(string(existing), 10, 64); e == nil {
            currentCount = n
        }
    }
    newCount := currentCount + 1
    p.counterMap[inputStr] = newCount
    if err = p.store.PutState([]byte(fullKey), []byte(strconv.FormatInt(newCount, 10))); err != nil {
        return err
    }

    out := map[string]interface{}{
        "total_processed": p.totalProcessed,
        "counter_map":     p.counterMap,
    }
    jsonBytes, _ := json.Marshal(out)
    return ctx.Emit(0, jsonBytes)
}

func (p *CounterProcessor) ProcessWatermark(ctx fssdk.Context, sourceID uint32, watermark uint64) error {
    return ctx.EmitWatermark(0, watermark)
}

func (p *CounterProcessor) Close(ctx fssdk.Context) error {
    p.store = nil
    p.counterMap = nil
    return nil
}

func main() {}

5. Build and Deployment

5.1 Prerequisites

  • Go: 1.23+
  • TinyGo: 0.40+ (WASI P2 support)
  • wit-bindgen-go: go install go.bytecodealliance.org/cmd/wit-bindgen-go@latest
  • wkg (optional): for fetching WIT deps — cargo install wkg --version 0.10.0
  • wasm-tools: for component validation — cargo install wasm-tools

5.2 Generate Bindings and Build

From the project root:

# Install Go SDK toolchain (wit-bindgen-go, wkg, etc.)
make -C go-sdk env

# Generate WIT bindings (from wit/processor.wit and deps)
make -C go-sdk bindings

# Run tests
make -C go-sdk build

In your operator project (e.g. examples/go-processor):

# Build as WASI P2 component with TinyGo
tinygo build -o build/processor.wasm -target=wasi .

See examples/go-processor/build.sh for the exact command.

5.3 Register and Run

Register the built processor.wasm and config.yaml as a function via the SQL CLI:

create function with (
  'function_path'='/path/to/build/processor.wasm',
  'config_path'='/path/to/config.yaml'
);

Configure name, type: processor, input-groups, and outputs (e.g. Kafka) in config.yaml. See Function Configuration and examples/go-processor/README.md.


6. Error Codes and Handling

The SDK returns errors as fssdk.SDKError; use errors.As to get the Code:

Code Meaning
ErrRuntimeInvalidDriver Invalid Driver passed to Run.
ErrRuntimeNotInitialized Runtime not initialized.
ErrRuntimeClosed Runtime already closed.
ErrStoreInvalidName Invalid Store name.
ErrStoreInternal Store internal error.
ErrStoreNotFound Store not found.
ErrStoreIO Store I/O error.
ErrResultUnexpected Unexpected result from host.

Example handling:

if err != nil {
    var sdkErr *fssdk.SDKError
    if errors.As(err, &sdkErr) {
        switch sdkErr.Code {
        case fssdk.ErrStoreNotFound:
            // Handle: create or return
        default:
            // Log and propagate
        }
    }
    return err
}

7. Advanced State API (see advanced doc)

This guide covers only the low-level go-sdk (Driver, Context, Store, directory layout). The advanced state API (Codec, ValueState, ListState, MapState, PriorityQueueState, AggregatingState, ReducingState, Keyed* factories and usage) is provided by a separate library go-sdk-advanced. Full reference, codec contract, constructor tables, and examples are in the advanced document:


8. Directory Layout

Low-level library (go-sdk):

go-sdk/
├── Makefile          # env / wit / bindings / build
├── fssdk.go          # Entry point and type re-exports
├── go.mod / go.sum
├── api/              # Interfaces and error codes
│   ├── driver.go     # Driver, BaseDriver
│   ├── context.go    # Context
│   ├── store.go      # Store, Iterator, ComplexKey
│   └── errors.go     # ErrorCode, SDKError
├── impl/             # Bridge to WASM host
│   ├── runtime.go
│   ├── context.go
│   └── store.go
├── state/
│   └── common/       # Shared helpers (Store type alias, DupBytes)
├── wit/              # processor.wit and deps (make wit)
└── bindings/         # Generated by wit-bindgen-go (make bindings)

For more examples and SQL operations, see examples/go-processor/README.md and the SQL CLI Guide.