Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ flowchart TD
| `step.db_query_cached` | Executes a cached SQL SELECT query | pipelinesteps |
| `step.db_create_partition` | Creates a time-based table partition | pipelinesteps |
| `step.db_sync_partitions` | Ensures future partitions exist for a partitioned table | pipelinesteps |
| `step.json_response` | Writes HTTP JSON response with custom status code and headers | pipelinesteps |
| `step.json_response` | Writes HTTP JSON response with custom status code and headers. Supports `status_from` to dynamically resolve the HTTP status code from the pipeline context at runtime | pipelinesteps |
| `step.raw_response` | Writes a raw HTTP response with arbitrary content type | pipelinesteps |
| `step.pipeline_output` | Marks structured data as the pipeline's return value for extraction by `engine.ExecutePipeline()` | pipelinesteps |
| `step.json_parse` | Parses a JSON string (or `[]byte`) in the pipeline context into a structured object | pipelinesteps |
Expand Down
71 changes: 54 additions & 17 deletions module/pipeline_step_json_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (

// JSONResponseStep writes an HTTP JSON response with a custom status code and stops the pipeline.
type JSONResponseStep struct {
name string
status int
headers map[string]string
body map[string]any
bodyRaw any // for non-map bodies (arrays, literals)
bodyFrom string
tmpl *TemplateEngine
name string
status int
statusFrom string
headers map[string]string
body map[string]any
bodyRaw any // for non-map bodies (arrays, literals)
bodyFrom string
tmpl *TemplateEngine
}

// NewJSONResponseStepFactory returns a StepFactory that creates JSONResponseStep instances.
Expand Down Expand Up @@ -54,28 +55,64 @@ func NewJSONResponseStepFactory() StepFactory {
bodyRaw = config["body"]
}
bodyFrom, _ := config["body_from"].(string)
statusFrom, _ := config["status_from"].(string)

return &JSONResponseStep{
name: name,
status: status,
headers: headers,
body: body,
bodyRaw: bodyRaw,
bodyFrom: bodyFrom,
tmpl: NewTemplateEngine(),
name: name,
status: status,
statusFrom: statusFrom,
headers: headers,
body: body,
bodyRaw: bodyRaw,
bodyFrom: bodyFrom,
tmpl: NewTemplateEngine(),
}, nil
}
}

func (s *JSONResponseStep) Name() string { return s.name }

// resolveStatus returns the effective HTTP status code for the response.
// If status_from is set, it resolves the value from the pipeline context and
// converts it to an integer. The resolved value must be a whole number within
// the valid HTTP status code range (100–599); otherwise it falls back to the
// static status (or 200 by default).
func (s *JSONResponseStep) resolveStatus(pc *PipelineContext) int {
if s.statusFrom != "" {
if val := resolveBodyFrom(s.statusFrom, pc); val != nil {
var code int
valid := false
switch v := val.(type) {
case int:
code = v
valid = true
case float64:
// Only accept whole numbers — reject 404.9, etc.
if v == float64(int(v)) {
code = int(v)
valid = true
}
case int64:
code = int(v)
valid = true
}
if valid && code >= 100 && code <= 599 {
return code
}
}
}
return s.status
}

func (s *JSONResponseStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) {
status := s.resolveStatus(pc)

w, ok := pc.Metadata["_http_response_writer"].(http.ResponseWriter)
if !ok {
// No response writer — return the body as output without writing HTTP
responseBody := s.resolveResponseBody(pc)
output := map[string]any{
"status": s.status,
"status": status,
}
if responseBody != nil {
output["body"] = responseBody
Expand All @@ -98,7 +135,7 @@ func (s *JSONResponseStep) Execute(_ context.Context, pc *PipelineContext) (*Ste
}

// Write status code
w.WriteHeader(s.status)
w.WriteHeader(status)

// Write body
if responseBody != nil {
Expand All @@ -112,7 +149,7 @@ func (s *JSONResponseStep) Execute(_ context.Context, pc *PipelineContext) (*Ste

return &StepResult{
Output: map[string]any{
"status": s.status,
"status": status,
},
Stop: true,
}, nil
Expand Down
Loading
Loading