diff --git a/lib/fluent/plugin/out_http.rb b/lib/fluent/plugin/out_http.rb index 431c1382d9..a5de8701d8 100644 --- a/lib/fluent/plugin/out_http.rb +++ b/lib/fluent/plugin/out_http.rb @@ -37,6 +37,8 @@ class RetryableResponse < StandardError; end config_param :proxy, :string, default: ENV['HTTP_PROXY'] || ENV['http_proxy'] desc 'Content-Type for HTTP request' config_param :content_type, :string, default: nil + desc 'JSON array data format for HTTP request body' + config_param :json_array, :bool, default: false desc 'Additional headers for HTTP request' config_param :headers, :hash, default: nil @@ -100,6 +102,13 @@ def configure(conf) @proxy_uri = URI.parse(@proxy) if @proxy @formatter = formatter_create @content_type = setup_content_type unless @content_type + + if @json_array + if @formatter_configs.first[:@type] != "json" + raise Fluent::ConfigError, "json_array option could be used with json formatter only" + end + define_singleton_method(:format, method(:format_json_array)) + end end def multi_workers_ready? @@ -114,6 +123,10 @@ def format(tag, time, record) @formatter.format(tag, time, record) end + def format_json_array(tag, time, record) + @formatter.format(tag, time, record) << "," + end + def write(chunk) uri = parse_endpoint(chunk) req = create_request(chunk, uri) @@ -128,7 +141,7 @@ def write(chunk) def setup_content_type case @formatter_configs.first[:@type] when 'json' - 'application/x-ndjson' + @json_array ? 'application/json' : 'application/x-ndjson' when 'csv' 'text/csv' when 'tsv', 'ltsv' @@ -202,7 +215,7 @@ def create_request(chunk, uri) req.basic_auth(@auth.username, @auth.password) end set_headers(req) - req.body = chunk.read + req.body = @json_array ? "[#{chunk.read.chop!}]" : chunk.read req end diff --git a/test/plugin/test_out_http.rb b/test/plugin/test_out_http.rb index 757c8d620a..b7b4fe9da8 100644 --- a/test/plugin/test_out_http.rb +++ b/test/plugin/test_out_http.rb @@ -80,6 +80,8 @@ def run_http_server req.body.each_line { |l| data << JSON.parse(l) } + when 'application/json' + data = JSON.parse(req.body) when 'text/plain' # Use single_value in this test req.body.each_line { |line| @@ -181,6 +183,19 @@ def test_configure_without_warn assert_not_match(/Status code 503 is going to be removed/, d.instance.log.out.logs.join) end + # Check if an exception is raised on not JSON format use + data('not_json' => 'msgpack') + def test_configure_with_json_array_err(format_type) + assert_raise(Fluent::ConfigError) do + create_driver(config + %[ + json_array true + + @type #{format_type} + + ]) + end + end + data('json' => ['json', 'application/x-ndjson'], 'ltsv' => ['ltsv', 'text/tab-separated-values'], 'msgpack' => ['msgpack', 'application/x-msgpack'], @@ -195,6 +210,14 @@ def test_configure_content_type(types) assert_equal content_type, d.instance.content_type end + # Check that json_array setting sets content_type = application/json + data('json' => 'application/json') + def test_configure_content_type_json_array(content_type) + d = create_driver(config + "json_array true") + + assert_equal content_type, d.instance.content_type + end + data('PUT' => 'put', 'POST' => 'post') def test_write_with_method(method) d = create_driver(config + "http_method #{method}") @@ -211,6 +234,21 @@ def test_write_with_method(method) assert_not_empty result.headers end + # Check that JSON at HTTP request body is valid + def test_write_with_json_array_setting + d = create_driver(config + "json_array true") + d.run(default_tag: 'test.http') do + test_events.each { |event| + d.feed(event) + } + end + + result = @@result + assert_equal 'application/json', result.content_type + assert_equal test_events, result.data + assert_not_empty result.headers + end + def test_write_with_single_value_format d = create_driver(config + %[