Skip to content

Commit fbfd32e

Browse files
committed
Add SourceReaderFactory param to SourceRunner
WIP
1 parent 8dfdac0 commit fbfd32e

File tree

1 file changed

+52
-43
lines changed

1 file changed

+52
-43
lines changed

workers/sourcerunner/source_runner.go

Lines changed: 52 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,26 @@ import (
2525
)
2626

2727
type SourceRunner struct {
28-
host string // A host name used to contract this SourceRunner
29-
ID string
30-
sourceReader connectors.SourceReader
31-
eoi bool // Did the sourceReader reach end of input
32-
userHandler proto.Handler
33-
operators *operatorCluster
34-
job proto.Job
35-
watermarker *wmark.Watermarker
36-
watermarkTicker *time.Ticker
37-
clock clocks.Clock
38-
isHalting atomic.Bool
39-
stopLoop context.CancelFunc // Signal to stop the event loop if running
40-
stop context.CancelCauseFunc // Signal to stop all source runner processes
41-
operatorFactory proto.OperatorFactory
42-
errChan chan error
43-
batchingParams batching.EventBatcherParams
44-
outputStream chan *workerpb.Event
45-
keyEventBatcher *batching.EventBatcher[[]byte]
46-
keyEventResults chan []*handlerpb.KeyedEvent
28+
host string // A host name used to contract this SourceRunner
29+
ID string
30+
sourceReader connectors.SourceReader
31+
eoi bool // Did the sourceReader reach end of input
32+
userHandler proto.Handler
33+
operators *operatorCluster
34+
job proto.Job
35+
watermarker *wmark.Watermarker
36+
watermarkTicker *time.Ticker
37+
clock clocks.Clock
38+
isHalting atomic.Bool
39+
stopLoop context.CancelFunc // Signal to stop the event loop if running
40+
stop context.CancelCauseFunc // Signal to stop all source runner processes
41+
operatorFactory proto.OperatorFactory
42+
sourceReaderFactory func(*workerpb.Source) connectors.SourceReader
43+
errChan chan error
44+
batchingParams batching.EventBatcherParams
45+
outputStream chan *workerpb.Event
46+
keyEventBatcher *batching.EventBatcher[[]byte]
47+
keyEventResults chan []*handlerpb.KeyedEvent
4748

4849
// Checkpoint barriers are enqueued for processing in series with other events.
4950
checkpointBarrier chan *workerpb.CheckpointBarrier
@@ -53,39 +54,47 @@ type SourceRunner struct {
5354
}
5455

5556
type NewParams struct {
56-
Host string
57-
UserHandler proto.Handler
58-
Job proto.Job
59-
Clock clocks.Clock
60-
OperatorFactory proto.OperatorFactory
61-
EventBatching batching.EventBatcherParams
57+
Host string
58+
UserHandler proto.Handler
59+
Job proto.Job
60+
Clock clocks.Clock
61+
OperatorFactory proto.OperatorFactory
62+
SourceReaderFactory func(*workerpb.Source) connectors.SourceReader
63+
EventBatching batching.EventBatcherParams
6264
}
6365

6466
func New(params NewParams) *SourceRunner {
6567
if params.Clock == nil {
6668
params.Clock = clocks.NewSystemClock()
6769
}
6870

71+
if params.SourceReaderFactory == nil {
72+
params.SourceReaderFactory = func(source *workerpb.Source) connectors.SourceReader {
73+
return config.NewSourceReaderFromProto(source)
74+
}
75+
}
76+
6977
id := ksuid.New().String()
7078
log := slog.With("instanceID", "source-runner-"+id[len(id)-4:])
7179
return &SourceRunner{
72-
ID: id,
73-
host: params.Host,
74-
job: params.Job,
75-
watermarker: &wmark.Watermarker{},
76-
watermarkTicker: time.NewTicker(math.MaxInt64), // initialize with ticker that never ticks
77-
userHandler: params.UserHandler,
78-
checkpointBarrier: make(chan *workerpb.CheckpointBarrier, 1),
79-
Logger: log,
80-
clock: params.Clock,
81-
stopLoop: func() {}, // initialize with noop
82-
stop: func(err error) {}, // initialize with noop
83-
operatorFactory: params.OperatorFactory,
84-
errChan: make(chan error),
85-
batchingParams: params.EventBatching,
86-
outputStream: make(chan *workerpb.Event, 1_000),
87-
keyEventBatcher: batching.NewEventBatcher[[]byte](context.Background(), params.EventBatching),
88-
keyEventResults: make(chan []*handlerpb.KeyedEvent, 1_000),
80+
ID: id,
81+
host: params.Host,
82+
job: params.Job,
83+
watermarker: &wmark.Watermarker{},
84+
watermarkTicker: time.NewTicker(math.MaxInt64), // initialize with ticker that never ticks
85+
userHandler: params.UserHandler,
86+
checkpointBarrier: make(chan *workerpb.CheckpointBarrier, 1),
87+
Logger: log,
88+
clock: params.Clock,
89+
stopLoop: func() {}, // initialize with noop
90+
stop: func(err error) {}, // initialize with noop
91+
operatorFactory: params.OperatorFactory,
92+
sourceReaderFactory: params.SourceReaderFactory,
93+
errChan: make(chan error),
94+
batchingParams: params.EventBatching,
95+
outputStream: make(chan *workerpb.Event, 1_000),
96+
keyEventBatcher: batching.NewEventBatcher[[]byte](context.Background(), params.EventBatching),
97+
keyEventResults: make(chan []*handlerpb.KeyedEvent, 1_000),
8998
}
9099
}
91100

@@ -167,7 +176,7 @@ func (r *SourceRunner) HandleStart(ctx context.Context, msg *workerpb.StartSourc
167176

168177
r.watermarkTicker = time.NewTicker(time.Millisecond * 200)
169178

170-
r.sourceReader = config.NewSourceReaderFromProto(msg.Sources[0])
179+
r.sourceReader = r.sourceReaderFactory(msg.Sources[0])
171180
if err := r.sourceReader.SetSplits(msg.Splits); err != nil {
172181
return err
173182
}

0 commit comments

Comments
 (0)