From e874265462b3120a8153f0d6842bef7246cdbea8 Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Fri, 13 Sep 2024 07:46:05 -0700 Subject: [PATCH 1/5] fix(elasticsearch): Encode bulk action parameters as JSON They are currently using string templating which, if there are special characters in the value, will end up creating an invalid JSON payload; an issue that can be difficult to track down. This happened in https://github.com/vectordotdev/vector/discussions/21288. Signed-off-by: Jesse Szwedko --- changelog.d/elasticsearch-encoding.fix.md | 2 + src/sinks/elasticsearch/encoder.rs | 100 +++++++++++++++++----- 2 files changed, 82 insertions(+), 20 deletions(-) create mode 100644 changelog.d/elasticsearch-encoding.fix.md diff --git a/changelog.d/elasticsearch-encoding.fix.md b/changelog.d/elasticsearch-encoding.fix.md new file mode 100644 index 0000000000000..1c0c12184089c --- /dev/null +++ b/changelog.d/elasticsearch-encoding.fix.md @@ -0,0 +1,2 @@ +The `elasticsearch` sink now encodes parameters such as `index` that contain characters that need to +be escaped in JSON strings. diff --git a/src/sinks/elasticsearch/encoder.rs b/src/sinks/elasticsearch/encoder.rs index 2ee4f893e6c2a..bc0b638b9ce5c 100644 --- a/src/sinks/elasticsearch/encoder.rs +++ b/src/sinks/elasticsearch/encoder.rs @@ -1,6 +1,7 @@ use std::{io, io::Write}; use serde::Serialize; +use serde_json::json; use vector_lib::buffers::EventCount; use vector_lib::{config::telemetry, event::Event, ByteSizeOf, EstimatedJsonEncodedSizeOf}; use vector_lib::{ @@ -154,48 +155,78 @@ fn write_bulk_action( (true, DocumentMetadata::Id(id)) => { write!( writer, - r#"{{"{}":{{"_index":"{}","_id":"{}"}}}}"#, - bulk_action, index, id + "{}", + json!({ + bulk_action: { + "_index": index, + "_id": id, + } + }), ) } (false, DocumentMetadata::Id(id)) => { write!( writer, - r#"{{"{}":{{"_index":"{}","_type":"{}","_id":"{}"}}}}"#, - bulk_action, index, doc_type, id + "{}", + json!({ + bulk_action: { + "_type": doc_type, + "_index": index, + "_id": id, + } + }), ) } (true, DocumentMetadata::WithoutId) => { - write!(writer, r#"{{"{}":{{"_index":"{}"}}}}"#, bulk_action, index) + write!( + writer, + "{}", + json!({ + bulk_action: { + "_index": index, + } + }), + ) } (false, DocumentMetadata::WithoutId) => { write!( writer, - r#"{{"{}":{{"_index":"{}","_type":"{}"}}}}"#, - bulk_action, index, doc_type + "{}", + json!({ + bulk_action: { + "_type": doc_type, + "_index": index, + } + }), ) } (true, DocumentMetadata::IdAndVersion(id, version)) => { write!( writer, - r#"{{"{}":{{"_index":"{}","_id":"{}","version_type":"{}","version":{}}}}}"#, - bulk_action, - index, - id, - version.kind.as_str(), - version.value + "{}", + json!({ + bulk_action: { + "_id": id, + "_index": index, + "version_type": version.kind.as_str(), + "version": version.value, + } + }), ) } (false, DocumentMetadata::IdAndVersion(id, version)) => { write!( writer, - r#"{{"{}":{{"_index":"{}","_type":"{}","_id":"{}","version_type":"{}","version":{}}}}}"#, - bulk_action, - index, - doc_type, - id, - version.kind.as_str(), - version.value + "{}", + json!({ + bulk_action: { + "_id": id, + "_type": doc_type, + "_index": index, + "version_type": version.kind.as_str(), + "version": version.value, + } + }), ) } }, @@ -317,4 +348,33 @@ mod tests { assert!(nested.contains_key("_type")); assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE")); } + + #[test] + fn encodes_fields_with_newlines() { + let mut writer = Vec::new(); + + _ = write_bulk_action( + &mut writer, + "ACTION\n", + "INDEX\n", + "TYPE\n", + false, + &DocumentMetadata::Id("ID\n".to_string()), + ); + + let value: serde_json::Value = serde_json::from_slice(&writer).unwrap(); + let value = value.as_object().unwrap(); + + assert!(value.contains_key("ACTION\n")); + + let nested = value.get("ACTION\n").unwrap(); + let nested = nested.as_object().unwrap(); + + assert!(nested.contains_key("_index")); + assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX\n")); + assert!(nested.contains_key("_id")); + assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID\n")); + assert!(nested.contains_key("_type")); + assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE\n")); + } } From 63297202895602f377731f8981c28ba75e10d96d Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Fri, 1 Nov 2024 13:54:06 -0700 Subject: [PATCH 2/5] Fix tests Signed-off-by: Jesse Szwedko --- src/sinks/elasticsearch/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sinks/elasticsearch/tests.rs b/src/sinks/elasticsearch/tests.rs index 7125e51443d84..e359be7b590d9 100644 --- a/src/sinks/elasticsearch/tests.rs +++ b/src/sinks/elasticsearch/tests.rs @@ -124,7 +124,7 @@ async fn encoding_with_external_versioning_with_version_set_includes_version() { ) .unwrap(); - let expected = r#"{"create":{"_index":"vector","_type":"_doc","_id":"42","version_type":"external","version":1337}} + let expected = r#"{"create":{"_id":"42","_index":"vector","_type":"_doc","version":1337,"version_type":"external"}} {"message":"hello there","my_field":"1337","timestamp":"2020-12-01T01:02:03Z"} "#; assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected); @@ -172,7 +172,7 @@ async fn encoding_with_external_gte_versioning_with_version_set_includes_version ) .unwrap(); - let expected = r#"{"create":{"_index":"vector","_type":"_doc","_id":"42","version_type":"external_gte","version":1337}} + let expected = r#"{"create":{"_id":"42","_index":"vector","_type":"_doc","version":1337,"version_type":"external_gte"}} {"message":"hello there","my_field":"1337","timestamp":"2020-12-01T01:02:03Z"} "#; assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected); From 52b637d558b9b647d50539bb9f6eaf333f189612 Mon Sep 17 00:00:00 2001 From: Thomas Schneider Date: Fri, 13 Jun 2025 14:06:07 -0400 Subject: [PATCH 3/5] Use serde_json to check json equality --- src/sinks/elasticsearch/tests.rs | 35 +++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/src/sinks/elasticsearch/tests.rs b/src/sinks/elasticsearch/tests.rs index 58ab3c1937ee8..4f89b2e306c56 100644 --- a/src/sinks/elasticsearch/tests.rs +++ b/src/sinks/elasticsearch/tests.rs @@ -1,4 +1,4 @@ -use std::convert::TryFrom; +use std::{convert::TryFrom, iter::zip}; use vector_lib::lookup::PathPrefix; @@ -62,7 +62,7 @@ async fn sets_create_action_when_configured() { let expected = r#"{"create":{"_index":"vector","_type":"_doc"}} {"action":"crea","message":"hello there","timestamp":"2020-12-01T01:02:03Z"} "#; - assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected); + assert_expected_is_encoded(expected, &encoded); assert_eq!(encoded.len(), encoded_size); } @@ -130,7 +130,7 @@ async fn encoding_with_external_versioning_with_version_set_includes_version() { let expected = r#"{"create":{"_id":"42","_index":"vector","_type":"_doc","version":1337,"version_type":"external"}} {"message":"hello there","my_field":"1337","timestamp":"2020-12-01T01:02:03Z"} "#; - assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected); + assert_expected_is_encoded(expected, &encoded); assert_eq!(encoded.len(), encoded_size); } @@ -179,7 +179,7 @@ async fn encoding_with_external_gte_versioning_with_version_set_includes_version let expected = r#"{"create":{"_id":"42","_index":"vector","_type":"_doc","version":1337,"version_type":"external_gte"}} {"message":"hello there","my_field":"1337","timestamp":"2020-12-01T01:02:03Z"} "#; - assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected); + assert_expected_is_encoded(expected, &encoded); assert_eq!(encoded.len(), encoded_size); } @@ -205,6 +205,21 @@ fn data_stream_body( ds } +fn assert_expected_is_encoded(expected: &str, encoded: &[u8]) { + let encoded = std::str::from_utf8(&encoded).unwrap(); + + let expected_lines: Vec<&str> = expected.lines().collect(); + let encoded_lines: Vec<&str> = encoded.lines().collect(); + + assert_eq!(expected_lines.len(), encoded_lines.len()); + + let to_value = |s: &str| -> serde_json::Value { serde_json::from_str(s).unwrap() }; + + zip(expected_lines, encoded_lines).for_each(|(expected, encoded)| { + assert_eq!(to_value(expected), to_value(encoded)); + }); +} + #[tokio::test] async fn encode_datastream_mode() { use chrono::{TimeZone, Utc}; @@ -252,7 +267,7 @@ async fn encode_datastream_mode() { let expected = r#"{"create":{"_index":"synthetics-testing-default","_type":"_doc"}} {"@timestamp":"2020-12-01T01:02:03Z","data_stream":{"dataset":"testing","namespace":"default","type":"synthetics"},"message":"hello there"} "#; - assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected); + assert_expected_is_encoded(expected, &encoded); assert_eq!(encoded.len(), encoded_size); } @@ -307,7 +322,7 @@ async fn encode_datastream_mode_no_routing() { let expected = r#"{"create":{"_index":"logs-generic-something","_type":"_doc"}} {"@timestamp":"2020-12-01T01:02:03Z","data_stream":{"dataset":"testing","namespace":"something","type":"synthetics"},"message":"hello there"} "#; - assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected); + assert_expected_is_encoded(expected, &encoded); assert_eq!(encoded.len(), encoded_size); } @@ -346,7 +361,7 @@ async fn handle_metrics() { assert_eq!(encoded_lines.len(), 3); // there's an empty line at the end assert_eq!( encoded_lines.first().unwrap(), - r#"{"create":{"_index":"vector","_type":"_doc"}}"# + r#"{"create":{"_type":"_doc","_index":"vector"}}"# ); assert!(encoded_lines .get(1) @@ -459,7 +474,7 @@ async fn encode_datastream_mode_no_sync() { let expected = r#"{"create":{"_index":"synthetics-testing-something","_type":"_doc"}} {"@timestamp":"2020-12-01T01:02:03Z","data_stream":{"dataset":"testing","type":"synthetics"},"message":"hello there"} "#; - assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected); + assert_expected_is_encoded(expected, &encoded); assert_eq!(encoded.len(), encoded_size); } @@ -495,7 +510,7 @@ async fn allows_using_except_fields() { let expected = r#"{"index":{"_index":"purple","_type":"_doc"}} {"foo":"bar","message":"hello there"} "#; - assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected); + assert_expected_is_encoded(expected, &encoded); assert_eq!(encoded.len(), encoded_size); } @@ -530,7 +545,7 @@ async fn allows_using_only_fields() { let expected = r#"{"index":{"_index":"purple","_type":"_doc"}} {"foo":"bar"} "#; - assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected); + assert_expected_is_encoded(expected, &encoded); assert_eq!(encoded.len(), encoded_size); } From f50a3c15d0c5805a87376df9dd0c5ff4a6e6904b Mon Sep 17 00:00:00 2001 From: Thomas Schneider Date: Fri, 13 Jun 2025 14:19:39 -0400 Subject: [PATCH 4/5] Remove unnecessary & --- src/sinks/elasticsearch/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/elasticsearch/tests.rs b/src/sinks/elasticsearch/tests.rs index 4f89b2e306c56..9ef28cc7f811b 100644 --- a/src/sinks/elasticsearch/tests.rs +++ b/src/sinks/elasticsearch/tests.rs @@ -206,7 +206,7 @@ fn data_stream_body( } fn assert_expected_is_encoded(expected: &str, encoded: &[u8]) { - let encoded = std::str::from_utf8(&encoded).unwrap(); + let encoded = std::str::from_utf8(encoded).unwrap(); let expected_lines: Vec<&str> = expected.lines().collect(); let encoded_lines: Vec<&str> = encoded.lines().collect(); From 7d6c6a48493a7b9be0a0c7e5993568784d00d102 Mon Sep 17 00:00:00 2001 From: Thomas Schneider Date: Fri, 13 Jun 2025 14:26:30 -0400 Subject: [PATCH 5/5] Unwrap instead of ignoring error --- src/sinks/elasticsearch/encoder.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/sinks/elasticsearch/encoder.rs b/src/sinks/elasticsearch/encoder.rs index 388ad5fb569a7..e4f1ace676181 100644 --- a/src/sinks/elasticsearch/encoder.rs +++ b/src/sinks/elasticsearch/encoder.rs @@ -244,14 +244,15 @@ mod tests { fn suppress_type_with_id() { let mut writer = Vec::new(); - _ = write_bulk_action( + write_bulk_action( &mut writer, "ACTION", "INDEX", "TYPE", true, &DocumentMetadata::Id("ID".to_string()), - ); + ) + .unwrap(); let value: serde_json::Value = serde_json::from_slice(&writer).unwrap(); let value = value.as_object().unwrap(); @@ -272,14 +273,15 @@ mod tests { fn suppress_type_without_id() { let mut writer = Vec::new(); - _ = write_bulk_action( + write_bulk_action( &mut writer, "ACTION", "INDEX", "TYPE", true, &DocumentMetadata::WithoutId, - ); + ) + .unwrap(); let value: serde_json::Value = serde_json::from_slice(&writer).unwrap(); let value = value.as_object().unwrap(); @@ -299,14 +301,15 @@ mod tests { fn type_with_id() { let mut writer = Vec::new(); - _ = write_bulk_action( + write_bulk_action( &mut writer, "ACTION", "INDEX", "TYPE", false, &DocumentMetadata::Id("ID".to_string()), - ); + ) + .unwrap(); let value: serde_json::Value = serde_json::from_slice(&writer).unwrap(); let value = value.as_object().unwrap(); @@ -328,14 +331,15 @@ mod tests { fn type_without_id() { let mut writer = Vec::new(); - _ = write_bulk_action( + write_bulk_action( &mut writer, "ACTION", "INDEX", "TYPE", false, &DocumentMetadata::WithoutId, - ); + ) + .unwrap(); let value: serde_json::Value = serde_json::from_slice(&writer).unwrap(); let value = value.as_object().unwrap(); @@ -356,14 +360,15 @@ mod tests { fn encodes_fields_with_newlines() { let mut writer = Vec::new(); - _ = write_bulk_action( + write_bulk_action( &mut writer, "ACTION\n", "INDEX\n", "TYPE\n", false, &DocumentMetadata::Id("ID\n".to_string()), - ); + ) + .unwrap(); let value: serde_json::Value = serde_json::from_slice(&writer).unwrap(); let value = value.as_object().unwrap();