Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.d/elasticsearch-encoding.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The `elasticsearch` sink now encodes parameters such as `index` that contain characters that need to
be escaped in JSON strings.
121 changes: 93 additions & 28 deletions src/sinks/elasticsearch/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{io, io::Write};

use serde::Serialize;
use serde_json::json;
use vector_lib::buffers::EventCount;
use vector_lib::{config::telemetry, event::Event, ByteSizeOf, EstimatedJsonEncodedSizeOf};
use vector_lib::{
Expand Down Expand Up @@ -157,48 +158,78 @@ fn write_bulk_action(
(true, DocumentMetadata::Id(id)) => {
write!(
writer,
r#"{{"{}":{{"_index":"{}","_id":"{}"}}}}"#,
bulk_action, index, id
"{}",
json!({
bulk_action: {
"_index": index,
"_id": id,
}
}),
)
}
(false, DocumentMetadata::Id(id)) => {
write!(
writer,
r#"{{"{}":{{"_index":"{}","_type":"{}","_id":"{}"}}}}"#,
bulk_action, index, doc_type, id
"{}",
json!({
bulk_action: {
"_type": doc_type,
"_index": index,
"_id": id,
}
}),
)
}
(true, DocumentMetadata::WithoutId) => {
write!(writer, r#"{{"{}":{{"_index":"{}"}}}}"#, bulk_action, index)
write!(
writer,
"{}",
json!({
bulk_action: {
"_index": index,
}
}),
)
}
(false, DocumentMetadata::WithoutId) => {
write!(
writer,
r#"{{"{}":{{"_index":"{}","_type":"{}"}}}}"#,
bulk_action, index, doc_type
"{}",
json!({
bulk_action: {
"_type": doc_type,
"_index": index,
}
}),
)
}
(true, DocumentMetadata::IdAndVersion(id, version)) => {
write!(
writer,
r#"{{"{}":{{"_index":"{}","_id":"{}","version_type":"{}","version":{}}}}}"#,
bulk_action,
index,
id,
version.kind.as_str(),
version.value
"{}",
json!({
bulk_action: {
"_id": id,
"_index": index,
"version_type": version.kind.as_str(),
"version": version.value,
}
}),
)
}
(false, DocumentMetadata::IdAndVersion(id, version)) => {
write!(
writer,
r#"{{"{}":{{"_index":"{}","_type":"{}","_id":"{}","version_type":"{}","version":{}}}}}"#,
bulk_action,
index,
doc_type,
id,
version.kind.as_str(),
version.value
"{}",
json!({
bulk_action: {
"_id": id,
"_type": doc_type,
"_index": index,
"version_type": version.kind.as_str(),
"version": version.value,
}
}),
)
}
},
Expand All @@ -213,14 +244,15 @@ mod tests {
fn suppress_type_with_id() {
let mut writer = Vec::new();

_ = write_bulk_action(
write_bulk_action(
&mut writer,
"ACTION",
"INDEX",
"TYPE",
true,
&DocumentMetadata::Id("ID".to_string()),
);
)
.unwrap();

let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
let value = value.as_object().unwrap();
Expand All @@ -241,14 +273,15 @@ mod tests {
fn suppress_type_without_id() {
let mut writer = Vec::new();

_ = write_bulk_action(
write_bulk_action(
&mut writer,
"ACTION",
"INDEX",
"TYPE",
true,
&DocumentMetadata::WithoutId,
);
)
.unwrap();

let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
let value = value.as_object().unwrap();
Expand All @@ -268,14 +301,15 @@ mod tests {
fn type_with_id() {
let mut writer = Vec::new();

_ = write_bulk_action(
write_bulk_action(
&mut writer,
"ACTION",
"INDEX",
"TYPE",
false,
&DocumentMetadata::Id("ID".to_string()),
);
)
.unwrap();

let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
let value = value.as_object().unwrap();
Expand All @@ -297,14 +331,15 @@ mod tests {
fn type_without_id() {
let mut writer = Vec::new();

_ = write_bulk_action(
write_bulk_action(
&mut writer,
"ACTION",
"INDEX",
"TYPE",
false,
&DocumentMetadata::WithoutId,
);
)
.unwrap();

let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
let value = value.as_object().unwrap();
Expand All @@ -320,4 +355,34 @@ mod tests {
assert!(nested.contains_key("_type"));
assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE"));
}

#[test]
fn encodes_fields_with_newlines() {
let mut writer = Vec::new();

write_bulk_action(
&mut writer,
"ACTION\n",
"INDEX\n",
"TYPE\n",
false,
&DocumentMetadata::Id("ID\n".to_string()),
)
.unwrap();

let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
let value = value.as_object().unwrap();

assert!(value.contains_key("ACTION\n"));

let nested = value.get("ACTION\n").unwrap();
let nested = nested.as_object().unwrap();

assert!(nested.contains_key("_index"));
assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX\n"));
assert!(nested.contains_key("_id"));
assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID\n"));
assert!(nested.contains_key("_type"));
assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE\n"));
}
}
39 changes: 27 additions & 12 deletions src/sinks/elasticsearch/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::convert::TryFrom;
use std::{convert::TryFrom, iter::zip};

use vector_lib::lookup::PathPrefix;

Expand Down Expand Up @@ -62,7 +62,7 @@ async fn sets_create_action_when_configured() {
let expected = r#"{"create":{"_index":"vector","_type":"_doc"}}
{"action":"crea","message":"hello there","timestamp":"2020-12-01T01:02:03Z"}
"#;
assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected);
assert_expected_is_encoded(expected, &encoded);
assert_eq!(encoded.len(), encoded_size);
}

Expand Down Expand Up @@ -127,10 +127,10 @@ async fn encoding_with_external_versioning_with_version_set_includes_version() {
)
.unwrap();

let expected = r#"{"create":{"_index":"vector","_type":"_doc","_id":"42","version_type":"external","version":1337}}
let expected = r#"{"create":{"_id":"42","_index":"vector","_type":"_doc","version":1337,"version_type":"external"}}
{"message":"hello there","my_field":"1337","timestamp":"2020-12-01T01:02:03Z"}
"#;
assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected);
assert_expected_is_encoded(expected, &encoded);
assert_eq!(encoded.len(), encoded_size);
}

Expand Down Expand Up @@ -176,10 +176,10 @@ async fn encoding_with_external_gte_versioning_with_version_set_includes_version
)
.unwrap();

let expected = r#"{"create":{"_index":"vector","_type":"_doc","_id":"42","version_type":"external_gte","version":1337}}
let expected = r#"{"create":{"_id":"42","_index":"vector","_type":"_doc","version":1337,"version_type":"external_gte"}}
{"message":"hello there","my_field":"1337","timestamp":"2020-12-01T01:02:03Z"}
"#;
assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected);
assert_expected_is_encoded(expected, &encoded);
assert_eq!(encoded.len(), encoded_size);
}

Expand All @@ -205,6 +205,21 @@ fn data_stream_body(
ds
}

fn assert_expected_is_encoded(expected: &str, encoded: &[u8]) {
let encoded = std::str::from_utf8(encoded).unwrap();

let expected_lines: Vec<&str> = expected.lines().collect();
let encoded_lines: Vec<&str> = encoded.lines().collect();

assert_eq!(expected_lines.len(), encoded_lines.len());

let to_value = |s: &str| -> serde_json::Value { serde_json::from_str(s).unwrap() };

zip(expected_lines, encoded_lines).for_each(|(expected, encoded)| {
assert_eq!(to_value(expected), to_value(encoded));
});
}

#[tokio::test]
async fn encode_datastream_mode() {
use chrono::{TimeZone, Utc};
Expand Down Expand Up @@ -252,7 +267,7 @@ async fn encode_datastream_mode() {
let expected = r#"{"create":{"_index":"synthetics-testing-default","_type":"_doc"}}
{"@timestamp":"2020-12-01T01:02:03Z","data_stream":{"dataset":"testing","namespace":"default","type":"synthetics"},"message":"hello there"}
"#;
assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected);
assert_expected_is_encoded(expected, &encoded);
assert_eq!(encoded.len(), encoded_size);
}

Expand Down Expand Up @@ -307,7 +322,7 @@ async fn encode_datastream_mode_no_routing() {
let expected = r#"{"create":{"_index":"logs-generic-something","_type":"_doc"}}
{"@timestamp":"2020-12-01T01:02:03Z","data_stream":{"dataset":"testing","namespace":"something","type":"synthetics"},"message":"hello there"}
"#;
assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected);
assert_expected_is_encoded(expected, &encoded);
assert_eq!(encoded.len(), encoded_size);
}

Expand Down Expand Up @@ -346,7 +361,7 @@ async fn handle_metrics() {
assert_eq!(encoded_lines.len(), 3); // there's an empty line at the end
assert_eq!(
encoded_lines.first().unwrap(),
r#"{"create":{"_index":"vector","_type":"_doc"}}"#
r#"{"create":{"_type":"_doc","_index":"vector"}}"#
);
assert!(encoded_lines
.get(1)
Expand Down Expand Up @@ -459,7 +474,7 @@ async fn encode_datastream_mode_no_sync() {
let expected = r#"{"create":{"_index":"synthetics-testing-something","_type":"_doc"}}
{"@timestamp":"2020-12-01T01:02:03Z","data_stream":{"dataset":"testing","type":"synthetics"},"message":"hello there"}
"#;
assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected);
assert_expected_is_encoded(expected, &encoded);
assert_eq!(encoded.len(), encoded_size);
}

Expand Down Expand Up @@ -495,7 +510,7 @@ async fn allows_using_except_fields() {
let expected = r#"{"index":{"_index":"purple","_type":"_doc"}}
{"foo":"bar","message":"hello there"}
"#;
assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected);
assert_expected_is_encoded(expected, &encoded);
assert_eq!(encoded.len(), encoded_size);
}

Expand Down Expand Up @@ -530,7 +545,7 @@ async fn allows_using_only_fields() {
let expected = r#"{"index":{"_index":"purple","_type":"_doc"}}
{"foo":"bar"}
"#;
assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected);
assert_expected_is_encoded(expected, &encoded);
assert_eq!(encoded.len(), encoded_size);
}

Expand Down
Loading