A Production-Ready, Generic Real-Time Data Processing Framework
goStreamTemplate is a high-performance streaming data processing framework built on modern technologies (Apache Kafka, Redis, Flink). It provides clean abstractions for building real-time data pipelines for IoT, log analytics, financial monitoring, and other use cases.
- β‘ High Performance: Sub-millisecond latency in Lite mode, 1M+ events/sec in Full mode
- π¨ Clean Abstractions: Simple, composable interfaces for Event, Processor, Source, Sink
- π Pluggable Architecture: Swap state backends, sources, and sinks with ease
- π Production Ready: Built-in metrics, graceful shutdown, error handling
- π Scalable: Two-mode operation (Lite for dev, Full for production)
βββββββββββββββββββββββββββββββββββββββββββ
β Data Sources β
β (IoT | Logs | Transactions | APIs) β
ββββββββββββββββ¬βββββββββββββββββββββββββββ
β
ββββββββββββββββΌβββββββββββββββββββββββββββ
β Kafka Ingestion Layer β
ββββββββββββββββ¬βββββββββββββββββββββββββββ
β
βββββββββ΄βββββββββ
βΌ βΌ
ββββββββββββββββ ββββββββββββββββββββββββ
β Lite Mode β β Full Mode β
β (Memory) β β (Flink Distributed) β
β <1ms latency β β 1M+ events/sec β
ββββββββ¬ββββββββ ββββββββ¬ββββββββββββββββ
β β
ββββββββββ¬βββββββββ
βΌ
ββββββββββββββββββββββββββββββββββββββββββ
β State Management (Memory/Redis/Flink) β
ββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββββββββββββββββ
β Output Sinks β
β (Kafka | Druid | Databases | Files) β
ββββββββββββββββββββββββββββββββββββββββββ
- Go 1.23+
- Docker & Docker Compose (for Kafka/Redis)
git clone https://github.com/clearclown/goStreamTemplate.git
cd goStreamTemplate
go mod downloadpackage main
import (
"context"
"github.com/clearclown/goStreamTemplate/pkg/stream"
"github.com/clearclown/goStreamTemplate/pkg/source/kafka"
"github.com/clearclown/goStreamTemplate/pkg/sink/kafka"
"github.com/clearclown/goStreamTemplate/pkg/state/memory"
)
// Define your processor
type MyProcessor struct{}
func (p *MyProcessor) Setup(ctx context.Context) error {
return nil
}
func (p *MyProcessor) Process(ctx context.Context, event stream.Event) (stream.ProcessResult, error) {
// Your business logic here
return stream.ProcessResult{
Success: true,
Output: []stream.Event{event},
}, nil
}
func (p *MyProcessor) Teardown(ctx context.Context) error {
return nil
}
func main() {
// Build your pipeline
pipeline := stream.NewPipeline().
From(kafka.NewKafkaSource(kafka.SourceConfig{
Brokers: []string{"localhost:9092"},
Topic: "input-events",
GroupID: "my-processor",
})).
Process(&MyProcessor{}).
To(kafka.NewKafkaSink(kafka.SinkConfig{
Brokers: []string{"localhost:9092"},
Topic: "output-events",
}))
// Run the pipeline
ctx := context.Background()
if err := pipeline.Run(ctx); err != nil {
panic(err)
}
}An Event is the fundamental unit of data in the pipeline:
type Event interface {
GetID() string
GetTimestamp() time.Time
GetPartitionKey() string
GetPayload() interface{}
GetMetadata() map[string]string
SetMetadata(key, value string)
}A Processor contains your business logic:
type Processor interface {
Setup(ctx context.Context) error
Process(ctx context.Context, event Event) (ProcessResult, error)
Teardown(ctx context.Context) error
}Sources read events, Sinks write events:
type Source interface {
Read(ctx context.Context) (Event, error)
Close() error
}
type Sink interface {
Write(ctx context.Context, event Event) error
Flush(ctx context.Context) error
Close() error
}State stores provide persistent state for processors:
type StateStore interface {
Get(ctx context.Context, key string) (interface{}, error)
Set(ctx context.Context, key string, value interface{}, ttl time.Duration) error
Delete(ctx context.Context, key string) error
// ... more methods
}pipeline := stream.NewPipeline().
From(kafka.NewKafkaSource(/* config */)).
Transform(FilterOutliers(0.0, 50.0)).
Transform(AggregateByWindow("device_id", time.Minute)).
Process(&TemperatureProcessor{}).
WithState(memory.NewMemoryStore()).
To(kafka.NewKafkaSink(/* config */))See examples/iot-temperature/ for complete code.
pipeline := stream.NewPipeline().
From(kafka.NewKafkaSource(/* config */)).
Transform(&LogParserTransform{}).
Transform(FilterErrors()).
Process(&ErrorRateProcessor{threshold: 0.05}).
To(alerting.NewSlackSink(/* config */))See examples/log-analytics/ for complete code.
pipeline := stream.NewPipeline().
From(kafka.NewKafkaSource(/* config */)).
Transform(&UserProfileEnricher{}).
Process(&FraudDetectionProcessor{
rules: []Rule{
VelocityRule{maxPerMin: 10},
AmountRule{maxAmount: 10000},
},
}).
To(kafka.NewKafkaSink(/* config */))See examples/fraud-detection/ for complete code.
- IoT Data Processing: Real-time sensor data processing, anomaly detection
- Log Analytics: Web server logs, application logs, security monitoring
- Financial Transactions: Real-time fraud detection, risk assessment
- E-commerce: Clickstream analysis, recommendation engines
- Gaming: Player analytics, anti-cheat systems
| Metric | Lite Mode | Full Mode |
|---|---|---|
| Latency (p99) | < 1ms | < 10ms |
| Throughput | 10K events/sec | 1M+ events/sec |
| Scalability | Single node | Linear scaling |
| State Size | ~10GB | Unlimited |
config := stream.PipelineConfig{
Parallelism: 4, // Number of workers
BufferSize: 1000, // Event buffer size
GracefulShutdown: 30 * time.Second, // Shutdown timeout
}
pipeline := stream.NewPipelineWithConfig(config)// Memory (for development)
store := memory.NewMemoryStore()
// Redis (for production)
store := redis.NewRedisStore(redis.Config{
Addr: "localhost:6379",
})
// Badger (embedded database)
store := badger.NewBadgerStore("/path/to/data")- QUICKSTART.md - Get started in 5 minutes
- ARCHITECTURE.md - Deep dive into system design
- API_REFERENCE.md - Complete API documentation
- USE_CASES.md - Common use case patterns
# Run unit tests
go test ./...
# Run benchmarks
go test -bench=. -benchmem ./...
# Run integration tests (requires Docker)
docker-compose up -d
go test ./tests/integration/...goStreamTemplate/
βββ pkg/
β βββ stream/ # Core interfaces and pipeline
β βββ state/ # State store interfaces and implementations
β βββ source/ # Data sources (Kafka, HTTP, files)
β βββ sink/ # Data sinks (Kafka, Druid, databases)
β βββ transform/ # Data transformations
β βββ monitoring/ # Metrics and observability
βββ cmd/ # Application entry points
βββ examples/ # Example use cases
βββ deployments/ # Docker Compose, Kubernetes manifests
βββ docs/ # Documentation
# Build all binaries
make build
# Build Docker images
make docker-build
# Run tests
make testThis framework is derived from goHTLL, a research project for Autonomous Intersection Management (AIM) that demonstrated the power of real-time streaming architectures.
MIT License - See LICENSE for details
Contributions are welcome! Please see CONTRIBUTING.md for guidelines.
Made with β€οΈ for the real-time data processing community