@@ -21,6 +21,7 @@ import (
2121 "github.com/functionstream/functionstream/common"
2222 "github.com/functionstream/functionstream/common/model"
2323 "github.com/pkg/errors"
24+ "github.com/sirupsen/logrus"
2425 "github.com/tetratelabs/wazero"
2526 "github.com/tetratelabs/wazero/api"
2627 "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
@@ -32,19 +33,25 @@ import (
3233type FunctionInstance struct {
3334 ctx context.Context
3435 cancelFunc context.CancelFunc
35- definition model.Function
36+ definition * model.Function
3637 pc pulsar.Client
3738 readyCh chan error
39+ index int32
3840}
3941
40- func NewFunctionInstance (definition model.Function , pc pulsar.Client ) * FunctionInstance {
42+ func NewFunctionInstance (definition * model.Function , pc pulsar.Client , index int32 ) * FunctionInstance {
4143 ctx , cancelFunc := context .WithCancel (context .Background ())
44+ ctx .Value (logrus.Fields {
45+ "function-name" : definition .Name ,
46+ "function-index" : index ,
47+ })
4248 return & FunctionInstance {
4349 ctx : ctx ,
4450 cancelFunc : cancelFunc ,
4551 definition : definition ,
4652 pc : pc ,
4753 readyCh : make (chan error ),
54+ index : index ,
4855 }
4956}
5057
@@ -69,7 +76,7 @@ func (instance *FunctionInstance) Run() {
6976 stdout := common .NewChanWriter ()
7077
7178 config := wazero .NewModuleConfig ().
72- WithStdout (stdout ).WithStdin (stdin )
79+ WithStdout (stdout ).WithStdin (stdin ). WithStderr ( os . Stderr )
7380
7481 wasi_snapshot_preview1 .MustInstantiate (instance .ctx , r )
7582
@@ -82,6 +89,7 @@ func (instance *FunctionInstance) Run() {
8289 consumer , err := instance .pc .Subscribe (pulsar.ConsumerOptions {
8390 Topics : instance .definition .Inputs ,
8491 SubscriptionName : fmt .Sprintf ("function-stream-%s" , instance .definition .Name ),
92+ Type : pulsar .Failover ,
8593 })
8694 if err != nil {
8795 instance .readyCh <- errors .Wrap (err , "Error creating consumer" )
@@ -102,8 +110,6 @@ func (instance *FunctionInstance) Run() {
102110 producer .Close ()
103111 }()
104112
105- instance .readyCh <- nil
106-
107113 handleErr := func (ctx context.Context , err error , message string , args ... interface {}) {
108114 if errors .Is (err , context .Canceled ) {
109115 slog .InfoContext (instance .ctx , "function instance has been stopped" )
@@ -112,6 +118,24 @@ func (instance *FunctionInstance) Run() {
112118 slog .ErrorContext (ctx , message , args ... )
113119 }
114120
121+ // Trigger the "_start" function, WASI's "main".
122+ mod , err := r .InstantiateWithConfig (instance .ctx , wasmBytes , config )
123+ if err != nil {
124+ if exitErr , ok := err .(* sys.ExitError ); ok && exitErr .ExitCode () != 0 {
125+ handleErr (instance .ctx , err , "Function exit with code" , "code" , exitErr .ExitCode ())
126+ } else if ! ok {
127+ handleErr (instance .ctx , err , "Error instantiating function" )
128+ }
129+ return
130+ }
131+ process := mod .ExportedFunction ("process" )
132+ if process == nil {
133+ instance .readyCh <- errors .New ("No process function found" )
134+ return
135+ }
136+
137+ instance .readyCh <- nil
138+
115139 for {
116140 msg , err := consumer .Receive (instance .ctx )
117141 if err != nil {
@@ -120,14 +144,9 @@ func (instance *FunctionInstance) Run() {
120144 }
121145 stdin .ResetBuffer (msg .Payload ())
122146
123- // Trigger the "_start" function, WASI's "main".
124- _ , err = r .InstantiateWithConfig (instance .ctx , wasmBytes , config )
147+ _ , err = process .Call (instance .ctx )
125148 if err != nil {
126- if exitErr , ok := err .(* sys.ExitError ); ok && exitErr .ExitCode () != 0 {
127- handleErr (instance .ctx , err , "Function exit with code" , "code" , exitErr .ExitCode ())
128- } else if ! ok {
129- handleErr (instance .ctx , err , "Error instantiating function" )
130- }
149+ handleErr (instance .ctx , err , "Error calling process function" )
131150 return
132151 }
133152
@@ -139,16 +158,17 @@ func (instance *FunctionInstance) Run() {
139158 handleErr (instance .ctx , err , "Error sending message" , "error" , err , "messageId" , id )
140159 return
141160 }
161+ err = consumer .Ack (msg )
162+ if err != nil {
163+ handleErr (instance .ctx , err , "Error acknowledging message" , "error" , err , "messageId" , id )
164+ return
165+ }
142166 })
143167 }
144168}
145169
146- func (instance * FunctionInstance ) WaitForReady () error {
147- err := <- instance .readyCh
148- if err != nil {
149- slog .ErrorContext (instance .ctx , "Error starting function instance" , err )
150- }
151- return err
170+ func (instance * FunctionInstance ) WaitForReady () chan error {
171+ return instance .readyCh
152172}
153173
154174func (instance * FunctionInstance ) Stop () {
0 commit comments