diff --git a/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/adapters/FalconHoseAlertAdapter.java b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/adapters/FalconHoseAlertAdapter.java new file mode 100644 index 0000000000..8a7da8f42e --- /dev/null +++ b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/adapters/FalconHoseAlertAdapter.java @@ -0,0 +1,87 @@ +package org.apache.metron.alerts.adapters; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.json.simple.JSONObject; +import org.apache.log4j.Logger; +import org.apache.metron.alerts.interfaces.AlertsAdapter; + +/** + * All messages from FalconHose are considered alerts, so this adapter + * simply constructs a normalized alert object for every message that comes + * in. + */ +public class FalconHoseAlertAdapter implements AlertsAdapter, Serializable { + + protected static final Logger LOG = Logger + .getLogger(FalconHoseAlertAdapter.class); + + public FalconHoseAlertAdapter() { + } + + public FalconHoseAlertAdapter(Map config) { + } + + @Override + public boolean initialize() { + return true; + } + + @Override + public boolean refresh() throws Exception { + return false; + } + + @Override + public Map alert(JSONObject rawMessage) { + Map alerts = new HashMap(); + + if (!rawMessage.containsKey("message")) { + return alerts; + } + + JSONObject content = (JSONObject)rawMessage.get("message"); + + String host = "unknown"; + if (content.containsKey("ip_dst_addr")) { + host = content.get("ip_dst_addr").toString(); + } + + String description = ""; + if (content.containsKey("original_string")) { + description = content.get("original_string").toString(); + } + + String alertId = generateAlertId(); + + JSONObject alert = new JSONObject(); + + alert.put("alert_id", alertId); + alert.put("designated_host", host); + alert.put("description", description); + + if (content.containsKey("SeverityName")) { + alert.put("priority", content.get("SeverityName").toString()); + } else { + alert.put("priority", "MED"); + } + + alerts.put(alertId, alert); + + return alerts; + } + + @Override + public boolean containsAlertId(String alert) { + return false; + } + + protected String generateAlertId() { + String new_UUID = System.currentTimeMillis() + "-" + UUID.randomUUID(); + return new_UUID; + + } +} diff --git a/metron-streaming/Metron-Alerts/src/test/java/org/apache/metron/alerts/adapters/FalconHoseAlertAdapterTest.java b/metron-streaming/Metron-Alerts/src/test/java/org/apache/metron/alerts/adapters/FalconHoseAlertAdapterTest.java new file mode 100644 index 0000000000..8eee60b61c --- /dev/null +++ b/metron-streaming/Metron-Alerts/src/test/java/org/apache/metron/alerts/adapters/FalconHoseAlertAdapterTest.java @@ -0,0 +1,93 @@ +package org.apache.metron.alerts.adapters; + +import java.util.Map; + +import org.json.simple.JSONObject; + +import org.apache.metron.test.AbstractConfigTest; +import org.apache.metron.alerts.adapters.FalconHoseAlertAdapter; + + /** + *
    + *
  • Title: FalconHoseAlertAdapterTest
  • + *
  • Description: Tests for FalconHoseAlertAdapter
  • + *
  • Created: January 20, 2016
  • + *
+ */ +public class FalconHoseAlertAdapterTest extends AbstractConfigTest { + + public FalconHoseAlertAdapterTest(String name) { + super(name); + } + + public void testInitializeAdapter() { + FalconHoseAlertAdapter adapter = new FalconHoseAlertAdapter(); + boolean initialized = adapter.initialize(); + assertTrue(initialized); + } + + public void testRefresh() throws Exception { + FalconHoseAlertAdapter adapter = new FalconHoseAlertAdapter(); + boolean refreshed = adapter.refresh(); + assertFalse(refreshed); + } + + public void testContainsAlertId(){ + FalconHoseAlertAdapter adapter = new FalconHoseAlertAdapter(); + boolean containsAlert = adapter.containsAlertId("test"); + assertFalse(containsAlert); + } + + public void testAlertNoMessage() { + FalconHoseAlertAdapter adapter = new FalconHoseAlertAdapter(); + + JSONObject message = new JSONObject(); + + Map alerts = adapter.alert(message); + + assertEquals(0, alerts.size()); + } + + public void testAlertEmptyMessage() { + FalconHoseAlertAdapter adapter = new FalconHoseAlertAdapter(); + + JSONObject internalMessage = new JSONObject(); + JSONObject message = new JSONObject(); + message.put("message", internalMessage); + + Map alerts = adapter.alert(message); + + assertEquals(1, alerts.size()); + + String alertId = alerts.keySet().iterator().next(); + JSONObject alert = alerts.get(alertId); + assertEquals(alertId, alert.get("alert_id")); + assertEquals("unknown", alert.get("designated_host")); + assertEquals("", alert.get("description")); + assertEquals("MED", alert.get("priority")); + } + + public void testAlert() { + FalconHoseAlertAdapter adapter = new FalconHoseAlertAdapter(); + + JSONObject internalMessage = new JSONObject(); + internalMessage.put("ip_dst_addr", "192.168.0.50"); + internalMessage.put("original_string", "this is original"); + internalMessage.put("SeverityName", "High"); + JSONObject message = new JSONObject(); + message.put("message", internalMessage); + + Map alerts = adapter.alert(message); + + assertEquals(1, alerts.size()); + + String alertId = alerts.keySet().iterator().next(); + JSONObject alert = alerts.get(alertId); + assertEquals(alertId, alert.get("alert_id")); + assertEquals("192.168.0.50", alert.get("designated_host")); + assertEquals("this is original", alert.get("description")); + assertEquals("High", alert.get("priority")); + } + +} + diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicFalconHoseParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicFalconHoseParser.java new file mode 100644 index 0000000000..5d1cc65bca --- /dev/null +++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicFalconHoseParser.java @@ -0,0 +1,88 @@ +package org.apache.metron.parsing.parsers; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.metron.tldextractor.BasicTldExtractor; + +@SuppressWarnings("serial") +public class BasicFalconHoseParser extends AbstractParser { + + protected static final Logger _LOG = LoggerFactory.getLogger(BasicFalconHoseParser.class); + private JSONCleaner cleaner = new JSONCleaner(); + + @SuppressWarnings("unchecked") + public JSONObject parse(byte[] msg) { + _LOG.trace("[OpenSOC] Starting to parse incoming message"); + + String rawMessage = null; + + try { + + rawMessage = new String(msg, "UTF-8"); + _LOG.trace("[OpenSOC] Received message: " + rawMessage); + + JSONObject cleanedMessage = cleaner.Clean(rawMessage); + _LOG.debug("[OpenSOC] Cleaned message: " + rawMessage); + + if (cleanedMessage == null || cleanedMessage.isEmpty()) { + throw new Exception("Unable to clean message: " + rawMessage); + } + + JSONObject payload = (JSONObject)cleanedMessage.get("event"); + + if (payload == null) { + throw new Exception("Unable to retrieve payload for message: " + + rawMessage); + } + + String originalString = ""; + for (Object k : payload.keySet()) { + originalString += " " + k.toString() + ":" + payload.get(k).toString(); + } + payload.put("original_string", originalString); + + if (payload.containsKey("LoginTime")) { + Long ts = Long.parseLong(payload.remove("LoginTime").toString()); + payload.put("timestamp", ts * 1000); + _LOG.trace("[OpenSOC] Added ts to: " + payload); + } else if (payload.containsKey("ProcessStartTime")) { + Long ts = Long.parseLong(payload.remove("ProcessStartTime").toString()); + payload.put("timestamp", ts); + _LOG.trace("[OpenSOC] Added ts to: " + payload); + } else { + payload.put("timestamp", System.currentTimeMillis()); + } + + if (payload.containsKey("UserIp")) { + String ip = payload.remove("UserIp").toString(); + payload.put("ip_src_addr", ip); + payload.put("ip_dst_addr", ip); + payload.put("ip_src_port", 0); + payload.put("ip_dst_port", 0); + } else if (payload.containsKey("ComputerName")) { + String name = payload.remove("ComputerName").toString(); + payload.put("ip_src_addr", name); + payload.put("ip_dst_addr", name); + payload.put("ip_src_port", 0); + payload.put("ip_dst_port", 0); + } + + _LOG.trace("[OpenSOC] Inner message: " + payload); + + payload.put("protocol", "http"); + _LOG.debug("[OpenSOC] Returning parsed message: " + payload); + + return payload; + } catch (Exception e) { + _LOG.error("Unable to Parse Message: " + rawMessage); + _LOG.error(e.getMessage(), e); + return null; + } + + } + + +} diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicFalconHoseParserTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicFalconHoseParserTest.java new file mode 100644 index 0000000000..0f6da129de --- /dev/null +++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicFalconHoseParserTest.java @@ -0,0 +1,76 @@ +package org.apache.metron.parsing.test; + +import java.util.Map; + +import junit.framework.TestCase; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; + +import org.apache.metron.parsing.parsers.BasicFalconHoseParser; + +public class BasicFalconHoseParserTest extends TestCase { + + private BasicFalconHoseParser falconHoseParser = null; + private JSONParser jsonParser = null; + + public BasicFalconHoseParserTest() throws Exception { + falconHoseParser = new BasicFalconHoseParser(); + jsonParser = new JSONParser(); + } + + public void testLoginAuditEvent() throws ParseException { + String rawMessage = "{\"metadata\":{\"offset\":3302,\"eventType\":\"LoginAuditEvent\"},\"event\":{\"LoginTime\":1444160709766,\"UserId\":\"tyler.baker@customer.rackspace.com\",\"UserIp\":\"50.56.228.73\",\"OperationName\":\"UserAuthenticate\",\"ServiceName\":\"TokenApi\",\"Success\":true}}"; + + Map rawMessageMap = (Map) jsonParser.parse(rawMessage); + JSONObject rawJson = (JSONObject) rawMessageMap.get("event"); + + JSONObject fhJson = falconHoseParser.parse(rawMessage.getBytes()); + + assertEquals(Long.parseLong(fhJson.get("timestamp").toString()), Long.parseLong(rawJson.get("LoginTime").toString()) * 1000); + assertEquals(fhJson.get("ip_src_addr").toString(), rawJson.get("UserIp").toString()); + assertEquals(fhJson.get("ip_dst_addr").toString(), rawJson.get("UserIp").toString()); + assertEquals(fhJson.get("ip_src_port").toString(), "0"); + assertEquals(fhJson.get("ip_dst_port").toString(), "0"); + assertEquals(fhJson.get("protocol").toString(), "http"); + assertTrue(fhJson.containsKey("original_string")); + assertTrue(fhJson.containsKey("timestamp")); + } + + public void testDetectionSummaryEvent() throws ParseException { + String rawMessage = "{\"metadata\":{\"offset\":3304,\"eventType\":\"DetectionSummaryEvent\"},\"event\":{\"ProcessStartTime\":1444168443,\"ProcessEndTime\":0,\"ProcessId\":288437472047,\"ParentProcessId\":288435542004,\"ComputerName\":\"619027-DAPPP083\",\"UserName\":\"mxaon_admin\",\"DetectName\":\"Suspicious Activity\",\"DetectDescription\":\"An administrative/reconnaissance tool was spawned under an IIS worker process\",\"Severity\":2,\"SeverityName\":\"Low\",\"FileName\":\"regsvr32.exe\",\"FilePath\":\"\\Device\\HarddiskVolume1\\Windows\\SysWOW64\",\"CommandLine\":\"regsvr32.exe /u /s C:\\Windows\\system32\\dxtmsft.dll\",\"SHA256String\":\"890c1734ed1ef6b2422a9b21d6205cf91e014add8a7f41aa5a294fcf60631a7b\",\"MD5String\":\"432be6cf7311062633459eef6b242fb5\",\"SHA1String\":\"N/A\",\"MachineDomain\":\"619027-DAPPP083\",\"FalconHostLink\":\"https://falcon.crowdstrike.com/detects/-2623836595666801992\",\"SensorId\":\"97264ff9a8b548749f41871e09c6856e\"}}"; + + Map rawMessageMap = (Map) jsonParser.parse(rawMessage); + JSONObject rawJson = (JSONObject) rawMessageMap.get("event"); + + JSONObject fhJson = falconHoseParser.parse(rawMessage.getBytes()); + + assertEquals(Long.parseLong(fhJson.get("timestamp").toString()), Long.parseLong(rawJson.get("ProcessStartTime").toString())); + assertEquals(fhJson.get("ip_src_addr").toString(), rawJson.get("ComputerName").toString()); + assertEquals(fhJson.get("ip_dst_addr").toString(), rawJson.get("ComputerName").toString()); + assertEquals(fhJson.get("ip_src_port").toString(), "0"); + assertEquals(fhJson.get("ip_dst_port").toString(), "0"); + assertEquals(fhJson.get("protocol").toString(), "http"); + assertTrue(fhJson.containsKey("original_string")); + assertTrue(fhJson.containsKey("timestamp")); + } + + public void testUserActivityAuditEvent() throws ParseException { + String rawMessage = "{\"metadata\":{\"offset\":3326,\"eventType\":\"UserActivityAuditEvent\"},\"event\":{\"UserId\":\"jason.blagg@customer.rackspace.com\",\"UserIp\":\"50.56.228.68\",\"OperationName\":\"UpdateDetectState\",\"ServiceName\":\"Detects\",\"Success\":true,\"AuditKeyValues\":[{\"Key\":\"detects\",\"ValueString\":\"6574431533307329744\"},{\"Key\":\"new_state\",\"ValueString\":\"in_progress\"}]}}"; + + Map rawMessageMap = (Map) jsonParser.parse(rawMessage); + JSONObject rawJson = (JSONObject) rawMessageMap.get("event"); + + JSONObject fhJson = falconHoseParser.parse(rawMessage.getBytes()); + + assertEquals(fhJson.get("ip_src_addr").toString(), rawJson.get("UserIp").toString()); + assertEquals(fhJson.get("ip_dst_addr").toString(), rawJson.get("UserIp").toString()); + assertEquals(fhJson.get("ip_src_port").toString(), "0"); + assertEquals(fhJson.get("ip_dst_port").toString(), "0"); + assertEquals(fhJson.get("protocol").toString(), "http"); + assertTrue(fhJson.containsKey("original_string")); + assertTrue(fhJson.containsKey("timestamp")); + } +} diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/falconhose/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/falconhose/local.yaml new file mode 100644 index 0000000000..4254bbfa37 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/falconhose/local.yaml @@ -0,0 +1,260 @@ +name: "falconhose-local" +config: + topology.workers: 1 + +components: + - id: "falconHoseParser" + className: "org.apache.metron.parsing.parsers.BasicFalconHoseParser" + - id: "genericMessageFilter" + className: "org.apache.metron.filters.GenericMessageFilter" + - id: "indexAdapter" + className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter" + - id: "alertsAdapter" + className: "org.apache.metron.alerts.adapters.FalconHoseAlertAdapter" + - id: "alertsIdentifier" + className: "org.json.simple.JSONObject" + configMethods: + - name: "put" + args: ["environment", "local"] + - name: "put" + args: ["topology", "falconhose"] + - id: "metricConfig" + className: "org.apache.commons.configuration.BaseConfiguration" + configMethods: + - name: "setProperty" + args: + - "org.apache.metron.metrics.reporter.graphite" + - "${org.apache.metron.metrics.reporter.graphite}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.reporter.console" + - "${org.apache.metron.metrics.reporter.console}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.reporter.jmx" + - "${org.apache.metron.metrics.reporter.jmx}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.graphite.address" + - "${org.apache.metron.metrics.graphite.address}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.graphite.port" + - "${org.apache.metron.metrics.graphite.port}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.TelemetryParserBolt.acks" + - "${org.apache.metron.metrics.TelemetryParserBolt.acks}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.TelemetryParserBolt.emits" + - "${org.apache.metron.metrics.TelemetryParserBolt.emits}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.TelemetryParserBolt.fails" + - "${org.apache.metron.metrics.TelemetryParserBolt.fails}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.GenericEnrichmentBolt.acks" + - "${org.apache.metron.metrics.GenericEnrichmentBolt.acks}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.GenericEnrichmentBolt.emits" + - "${org.apache.metron.metrics.GenericEnrichmentBolt.emits}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.GenericEnrichmentBolt.fails" + - "${org.apache.metron.metrics.GenericEnrichmentBolt.fails}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.TelemetryIndexingBolt.acks" + - "${org.apache.metron.metrics.TelemetryIndexingBolt.acks}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.TelemetryIndexingBolt.emits" + - "${org.apache.metron.metrics.TelemetryIndexingBolt.emits}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.TelemetryIndexingBolt.fails" + - "${org.apache.metron.metrics.TelemetryIndexingBolt.fails}" + +spouts: + - id: "testingSpout" + className: "org.apache.metron.test.spouts.GenericInternalTestSpout" + parallelism: 1 + configMethods: + - name: "withFilename" + args: + - "SampleInput/FalconHoseExampleOutput" + - name: "withRepeating" + args: + - true + +bolts: + - id: "parserBolt" + className: "org.apache.metron.parsing.TelemetryParserBolt" + configMethods: + - name: "withMessageParser" + args: + - ref: "falconHoseParser" + - name: "withOutputFieldName" + args: + - "falconhose" + - name: "withMessageFilter" + args: + - ref: "genericMessageFilter" + - name: "withMetricConfig" + args: + - ref: "metricConfig" + - id: "indexingBolt" + className: "org.apache.metron.indexing.TelemetryIndexingBolt" + configMethods: + - name: "withIndexIP" + args: + - "${es.ip}" + - name: "withIndexPort" + args: + - ${es.port} + - name: "withClusterName" + args: + - "${es.clustername}" + - name: "withIndexName" + args: + - "falconhose_index" + - name: "withIndexTimestamp" + args: + - "yyyy.MM.dd.hh" + - name: "withDocumentName" + args: + - "falconhose_doc" + - name: "withBulk" + args: + - 1 + - name: "withIndexAdapter" + args: + - ref: "indexAdapter" + - name: "withMetricConfiguration" + args: + - ref: "metricConfig" + - id: "alertsBolt" + className: "org.apache.metron.alerts.TelemetryAlertsBolt" + configMethods: + - name: "withIdentifier" + args: + - ref: "alertsIdentifier" + - name: "withMaxCacheSize" + args: [1000] + - name: "withMaxTimeRetain" + args: [3600] + - name: "withAlertsAdapter" + args: + - ref: "alertsAdapter" + - name: "withOutputFieldName" + args: ["message"] + - name: "withMetricConfiguration" + args: + - ref: "metricConfig" + - id: "alertsIndexingBolt" + className: "org.apache.metron.indexing.TelemetryIndexingBolt" + configMethods: + - name: "withIndexIP" + args: + - "${es.ip}" + - name: "withIndexPort" + args: + - ${es.port} + - name: "withClusterName" + args: + - "${es.clustername}" + - name: "withIndexName" + args: + - "alert" + - name: "withIndexTimestamp" + args: + - "yyyy.MM.ww" + - name: "withDocumentName" + args: + - "falconhose_alert" + - name: "withBulk" + args: + - 1 + - name: "withIndexAdapter" + args: + - ref: "indexAdapter" + - name: "withMetricConfiguration" + args: + - ref: "metricConfig" + - id: "errorIndexingBolt" + className: "org.apache.metron.indexing.TelemetryIndexingBolt" + configMethods: + - name: "withIndexIP" + args: + - "${es.ip}" + - name: "withIndexPort" + args: + - ${es.port} + - name: "withClusterName" + args: + - "${es.clustername}" + - name: "withIndexName" + args: + - "error" + - name: "withIndexTimestamp" + args: + - "yyyy.MM" + - name: "withDocumentName" + args: + - "falconhose_error" + - name: "withBulk" + args: + - 1 + - name: "withIndexAdapter" + args: + - ref: "indexAdapter" + - name: "withMetricConfiguration" + args: + - ref: "metricConfig" + +streams: + - name: "spout -> parser" + from: "testingSpout" + to: "parserBolt" + grouping: + type: SHUFFLE + - name: "parser -> indexing" + from: "parserBolt" + to: "indexingBolt" + grouping: + streamId: "message" + type: SHUFFLE + - name: "parser -> alerts" + from: "parserBolt" + to: "alertsBolt" + grouping: + streamId: "message" + type: FIELDS + args: ["key"] + - name: "alerts -> alertsIndexing" + from: "alertsBolt" + to: "alertsIndexingBolt" + grouping: + streamId: "alert" + type: SHUFFLE + - name: "parser -> errors" + from: "parserBolt" + to: "errorIndexingBolt" + grouping: + streamId: "error" + type: SHUFFLE + - name: "indexing -> errors" + from: "indexingBolt" + to: "errorIndexingBolt" + grouping: + streamId: "error" + type: SHUFFLE + - name: "alerts -> errors" + from: "alertsBolt" + to: "errorIndexingBolt" + grouping: + streamId: "error" + type: SHUFFLE diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/falconhose/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/falconhose/remote.yaml new file mode 100644 index 0000000000..7e95118bde --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/falconhose/remote.yaml @@ -0,0 +1,272 @@ +name: "falconhose" +config: + topology.workers: 1 + +components: + - id: "falconHoseParser" + className: "org.apache.metron.parsing.parsers.BasicFalconHoseParser" + - id: "genericMessageFilter" + className: "org.apache.metron.filters.GenericMessageFilter" + - id: "indexAdapter" + className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter" + - id: "alertsAdapter" + className: "org.apache.metron.alerts.adapters.FalconHoseAlertAdapter" + - id: "alertsIdentifier" + className: "org.json.simple.JSONObject" + configMethods: + - name: "put" + args: ["environment", "local"] + - name: "put" + args: ["topology", "falconhose"] + - id: "metricConfig" + className: "org.apache.commons.configuration.BaseConfiguration" + configMethods: + - name: "setProperty" + args: + - "org.apache.metron.metrics.reporter.graphite" + - "${org.apache.metron.metrics.reporter.graphite}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.reporter.console" + - "${org.apache.metron.metrics.reporter.console}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.reporter.jmx" + - "${org.apache.metron.metrics.reporter.jmx}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.graphite.address" + - "${org.apache.metron.metrics.graphite.address}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.graphite.port" + - "${org.apache.metron.metrics.graphite.port}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.TelemetryParserBolt.acks" + - "${org.apache.metron.metrics.TelemetryParserBolt.acks}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.TelemetryParserBolt.emits" + - "${org.apache.metron.metrics.TelemetryParserBolt.emits}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.TelemetryParserBolt.fails" + - "${org.apache.metron.metrics.TelemetryParserBolt.fails}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.GenericEnrichmentBolt.acks" + - "${org.apache.metron.metrics.GenericEnrichmentBolt.acks}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.GenericEnrichmentBolt.emits" + - "${org.apache.metron.metrics.GenericEnrichmentBolt.emits}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.GenericEnrichmentBolt.fails" + - "${org.apache.metron.metrics.GenericEnrichmentBolt.fails}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.TelemetryIndexingBolt.acks" + - "${org.apache.metron.metrics.TelemetryIndexingBolt.acks}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.TelemetryIndexingBolt.emits" + - "${org.apache.metron.metrics.TelemetryIndexingBolt.emits}" + - name: "setProperty" + args: + - "org.apache.metron.metrics.TelemetryIndexingBolt.fails" + - "${org.apache.metron.metrics.TelemetryIndexingBolt.fails}" + - id: "zkHosts" + className: "storm.kafka.ZkHosts" + constructorArgs: + - "${kafka.zk}" + - id: "kafkaConfig" + className: "storm.kafka.SpoutConfig" + constructorArgs: + # zookeeper hosts + - ref: "zkHosts" + # topic name + - "${spout.kafka.topic}" + # zk root + - "" + # id + - "${spout.kafka.topic}" + properties: + - name: "startOffsetTime" + value: -1 + +spouts: + - id: "kafkaSpout" + className: "storm.kafka.KafkaSpout" + constructorArgs: + - ref: "kafkaConfig" + +bolts: + - id: "parserBolt" + className: "org.apache.metron.parsing.TelemetryParserBolt" + configMethods: + - name: "withMessageParser" + args: + - ref: "falconHoseParser" + - name: "withOutputFieldName" + args: + - "falconhose" + - name: "withMessageFilter" + args: + - ref: "genericMessageFilter" + - name: "withMetricConfig" + args: + - ref: "metricConfig" + - id: "indexingBolt" + className: "org.apache.metron.indexing.TelemetryIndexingBolt" + configMethods: + - name: "withIndexIP" + args: + - "${es.ip}" + - name: "withIndexPort" + args: + - ${es.port} + - name: "withClusterName" + args: + - "${es.clustername}" + - name: "withIndexName" + args: + - "falconhose_index" + - name: "withIndexTimestamp" + args: + - "yyyy.MM.dd.hh" + - name: "withDocumentName" + args: + - "falconhose_doc" + - name: "withBulk" + args: + - 1 + - name: "withIndexAdapter" + args: + - ref: "indexAdapter" + - name: "withMetricConfiguration" + args: + - ref: "metricConfig" + - id: "alertsBolt" + className: "org.apache.metron.alerts.TelemetryAlertsBolt" + configMethods: + - name: "withIdentifier" + args: + - ref: "alertsIdentifier" + - name: "withMaxCacheSize" + args: [1000] + - name: "withMaxTimeRetain" + args: [3600] + - name: "withAlertsAdapter" + args: + - ref: "alertsAdapter" + - name: "withOutputFieldName" + args: ["message"] + - name: "withMetricConfiguration" + args: + - ref: "metricConfig" + - id: "alertsIndexingBolt" + className: "org.apache.metron.indexing.TelemetryIndexingBolt" + configMethods: + - name: "withIndexIP" + args: + - "${es.ip}" + - name: "withIndexPort" + args: + - ${es.port} + - name: "withClusterName" + args: + - "${es.clustername}" + - name: "withIndexName" + args: + - "alert" + - name: "withIndexTimestamp" + args: + - "yyyy.MM.ww" + - name: "withDocumentName" + args: + - "falconhose_alert" + - name: "withBulk" + args: + - 1 + - name: "withIndexAdapter" + args: + - ref: "indexAdapter" + - name: "withMetricConfiguration" + args: + - ref: "metricConfig" + - id: "errorIndexingBolt" + className: "org.apache.metron.indexing.TelemetryIndexingBolt" + configMethods: + - name: "withIndexIP" + args: + - "${es.ip}" + - name: "withIndexPort" + args: + - ${es.port} + - name: "withClusterName" + args: + - "${es.clustername}" + - name: "withIndexName" + args: + - "error" + - name: "withIndexTimestamp" + args: + - "yyyy.MM" + - name: "withDocumentName" + args: + - "falconhose_error" + - name: "withBulk" + args: + - 1 + - name: "withIndexAdapter" + args: + - ref: "indexAdapter" + - name: "withMetricConfiguration" + args: + - ref: "metricConfig" + +streams: + - name: "spout -> parser" + from: "kafkaSpout" + to: "parserBolt" + grouping: + type: SHUFFLE + - name: "parser -> indexing" + from: "parserBolt" + to: "indexingBolt" + grouping: + streamId: "message" + type: SHUFFLE + - name: "parser -> alerts" + from: "parserBolt" + to: "alertsBolt" + grouping: + streamId: "message" + type: FIELDS + args: ["key"] + - name: "alerts -> alertsIndexing" + from: "alertsBolt" + to: "alertsIndexingBolt" + grouping: + streamId: "alert" + type: SHUFFLE + - name: "parser -> errors" + from: "parserBolt" + to: "errorIndexingBolt" + grouping: + streamId: "error" + type: SHUFFLE + - name: "indexing -> errors" + from: "indexingBolt" + to: "errorIndexingBolt" + grouping: + streamId: "error" + type: SHUFFLE + - name: "alerts -> errors" + from: "alertsBolt" + to: "errorIndexingBolt" + grouping: + streamId: "error" + type: SHUFFLE diff --git a/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/FalconHoseExampleOutput b/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/FalconHoseExampleOutput new file mode 100644 index 0000000000..8f9842d0cf --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/FalconHoseExampleOutput @@ -0,0 +1,4 @@ +{"metadata":{"offset":1,"eventType":"LoginAuditEvent"},"event":{"LoginTime":1447097764168,"UserId":"user1@example.com","UserIp":"192.168.0.50","OperationName":"UserAuthenticate","ServiceName":"TokenApi","Success":true}} +{"metadata":{"offset":2,"eventType":"LoginAuditEvent"},"event":{"LoginTime":1447097785969,"UserId":"user1@example.com","UserIp":"192.168.0.50","OperationName":"UserAuthenticate","ServiceName":"TokenApi","Success":true}} +{"metadata":{"offset":3,"eventType":"DetectionSummaryEvent"},"event":{"ProcessStartTime":1447097902,"ProcessEndTime":1447097905,"ProcessId":12345,"ParentProcessId":23456,"ComputerName":"fake-computer-1","UserName":"user2","DetectName":"Suspicious Activity","DetectDescription":"An administrative/reconnaissance tool was spawned under an IIS worker process","Severity":2,"SeverityName":"Low","FileName":"regsvr32.exe","FilePath":"\\Device\\HarddiskVolume1\\Windows\\SysWOW64","CommandLine":"regsvr32.exe /u /s C:\\Windows\\system32\\dxtmsft.dll","SHA256String":"890c1734ed1ef6b2422a9b21d6205cf91e014add8a7f41aa5a294fcf60631a7b","MD5String":"432be6cf7311062633459eef6b242fb5","SHA1String":"N/A","MachineDomain":"fake-domain-1","FalconHostLink":"https://falcon.crowdstrike.com/detects/123","SensorId":"6789"}} +{"metadata":{"offset":4,"eventType":"DetectionSummaryEvent"},"event":{"ProcessStartTime":1447097921,"ProcessEndTime":1447097921,"ProcessId":34567,"ParentProcessId":45678,"ComputerName":"fake-computer-2","UserName":"user2","DetectName":"Suspicious Activity","DetectDescription":"An administrative/reconnaissance tool was spawned under an IIS worker process","Severity":2,"SeverityName":"Low","FileName":"regsvr32.exe","FilePath":"\\Device\\HarddiskVolume1\\Windows\\SysWOW64","CommandLine":"regsvr32.exe /s C:\\Windows\\system32\\dxtrans.dll","SHA256String":"890c1734ed1ef6b2422a9b21d6205cf91e014add8a7f41aa5a294fcf60631a7b","MD5String":"432be6cf7311062633459eef6b242fb5","SHA1String":"N/A","MachineDomain":"fake-domain-2","FalconHostLink":"https://falcon.crowdstrike.com/detects/124","SensorId":"5678"}}