Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Closed
37 changes: 0 additions & 37 deletions metron-deployment/roles/metron_streaming/tasks/grok_upload.yml

This file was deleted.

3 changes: 0 additions & 3 deletions metron-deployment/roles/metron_streaming/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@
- include: hdfs_filesystem.yml
run_once: true

- include: grok_upload.yml
run_once: true

- include: topologies.yml

- include: source_config.yml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"parserClassName": "org.apache.metron.parsers.GrokParser",
"sensorTopic": "squid",
"parserConfig": {
"grokPath": "/patterns/squid",
"grokPattern": "SQUID_DELIMITED %{NUMBER:timestamp}[^0-9]*%{INT:elapsed} %{IP:ip_src_addr} %{WORD:action}/%{NUMBER:code} %{NUMBER:bytes} %{WORD:method} %{NOTSPACE:url}[^0-9]*(%{IP:ip_dst_addr})?",
"patternLabel": "SQUID_DELIMITED",
"timestampField": "timestamp"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,35 @@
{
"parserClassName":"org.apache.metron.parsers.websphere.GrokWebSphereParser",
"sensorTopic":"websphere",
"parserConfig":
{
"grokPath":"/patterns/websphere",
"patternLabel":"WEBSPHERE",
"timestampField":"timestamp_string",
"dateFormat":"yyyy MMM dd HH:mm:ss"
"parserClassName": "org.apache.metron.parsers.websphere.GrokWebSphereParser",
"sensorTopic": "websphere",
"parserConfig": {
"grokPattern": [
"# Days - two digit number is used",
"DAY \\d{1,2}",
"# Time - two digit hour, minute, and second",
"TIME \\d{2}:\\d{2}:\\d{2}",
"# Timestamp - month, day, and time",
"TIMESTAMP %{MONTH:UNWANTED}\\s+%{DAY:UNWANTED} %{TIME:UNWANTED}",
"# Generic word field",
"WORD \\w+",
"# Priority",
"PRIORITY \\d+",
"# Log start - the first part of the log line",
"LOGSTART <%{PRIORITY:priority}>?%{TIMESTAMP:timestamp_string} %{WORD:hostname}",
"# Security domain",
"SECURITY_DOMAIN [%{WORD:security_domain}]",
"# Log middle - the middle part of the log line",
"LOGMIDDLE (\\[%{WORD:security_domain}\\])?\\[%{WORD:event_code}\\]\\[%{WORD:event_type}\\]\\[%{WORD:severity}\\]",
"# Define IP address formats",
"IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:)))(%.+)?",
"IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])",
"IP (?:%{IPV6:UNWANTED}|%{IPV4:UNWANTED})",
"# Message - the message body of the log",
"MESSAGE .*",
"# WebSphere - the entire log message",
"WEBSPHERE %{LOGSTART:UNWANTED} %{LOGMIDDLE:UNWANTED} %{MESSAGE:message}"
],
"patternLabel": "WEBSPHERE",
"timestampField": "timestamp_string",
"dateFormat": "yyyy MMM dd HH:mm:ss"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
],
"parserConfig":
{
"grokPath":"/patterns/yaf",
"grokPattern":["YAF_TIME_FORMAT %{YEAR:UNWANTED}-%{MONTHNUM:UNWANTED}-%{MONTHDAY:UNWANTED}[T ]%{HOUR:UNWANTED}:%{MINUTE:UNWANTED}:%{SECOND:UNWANTED}",
"YAF_DELIMITED %{YAF_TIME_FORMAT:start_time}\\|%{YAF_TIME_FORMAT:end_time}\\|%{SPACE:UNWANTED}%{BASE10NUM:duration}\\|%{SPACE:UNWANTED}%{BASE10NUM:rtt}\\|%{SPACE:UNWANTED}%{INT:protocol}\\|%{SPACE:UNWANTED}%{IP:ip_src_addr}\\|%{SPACE:UNWANTED}%{INT:ip_src_port}\\|%{SPACE:UNWANTED}%{IP:ip_dst_addr}\\|%{SPACE:UNWANTED}%{INT:ip_dst_port}\\|%{SPACE:UNWANTED}%{DATA:iflags}\\|%{SPACE:UNWANTED}%{DATA:uflags}\\|%{SPACE:UNWANTED}%{DATA:riflags}\\|%{SPACE:UNWANTED}%{DATA:ruflags}\\|%{SPACE:UNWANTED}%{WORD:isn}\\|%{SPACE:UNWANTED}%{DATA:risn}\\|%{SPACE:UNWANTED}%{DATA:tag}\\|%{GREEDYDATA:rtag}\\|%{SPACE:UNWANTED}%{INT:pkt}\\|%{SPACE:UNWANTED}%{INT:oct}\\|%{SPACE:UNWANTED}%{INT:rpkt}\\|%{SPACE:UNWANTED}%{INT:roct}\\|%{SPACE:UNWANTED}%{INT:app}\\|%{GREEDYDATA:end_reason}"],
"patternLabel":"YAF_DELIMITED",
"timestampField":"start_time",
"timeFields": ["start_time", "end_time"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@
import com.google.common.base.Splitter;
import oi.thekraken.grok.api.Grok;
import oi.thekraken.grok.api.Match;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.metron.common.Constants;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
Expand All @@ -47,16 +44,26 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable {
protected static final Logger LOG = LoggerFactory.getLogger(GrokParser.class);

protected transient Grok grok;
protected String grokPath;
protected String grokPattern;
protected String patternLabel;
protected List<String> timeFields = new ArrayList<>();
protected String timestampField;
protected SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S z");
protected String patternsCommonDir = "/patterns/common";
protected String patternsCommonPath = "/patterns/common";

@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, Object> parserConfig) {
this.grokPath = (String) parserConfig.get("grokPath");
Object grokPattern = parserConfig.get("grokPattern");
if (grokPattern instanceof String) {
this.grokPattern = (String) grokPattern;
} else if (grokPattern instanceof String[]){
String[] patterns = (String[]) grokPattern;
this.grokPattern = Joiner.on('\n').join(patterns);
} else if (grokPattern instanceof Iterable) {
Iterable<String> patterns = (Iterable<String>) grokPattern;
this.grokPattern = Joiner.on('\n').join(patterns);
}
this.patternLabel = (String) parserConfig.get("patternLabel");
this.timestampField = (String) parserConfig.get("timestampField");
List<String> timeFieldsParam = (List<String>) parserConfig.get("timeFields");
Expand All @@ -77,41 +84,30 @@ public void configure(Map<String, Object> parserConfig) {
}
}

public InputStream openInputStream(String streamName) throws IOException {
FileSystem fs = FileSystem.get(new Configuration());
Path path = new Path(streamName);
if(fs.exists(path)) {
return fs.open(path);
} else {
return getClass().getResourceAsStream(streamName);
}
}

@Override
public void init() {
grok = new Grok();
try {
InputStream commonInputStream = openInputStream(patternsCommonDir);
InputStream commonInputStream = getClass().getResourceAsStream(patternsCommonPath);
if (LOG.isDebugEnabled()) {
LOG.debug("Grok parser loading common patterns from: " + patternsCommonDir);
LOG.debug("Grok parser loading common patterns from: " + patternsCommonPath);
}

if (commonInputStream == null) {
throw new RuntimeException(
"Unable to initialize grok parser: Unable to load " + patternsCommonDir + " from either classpath or HDFS");
"Unable to initialize grok parser: Unable to load " + patternsCommonPath + " from classpath");
}

grok.addPatternFromReader(new InputStreamReader(commonInputStream));

if (LOG.isDebugEnabled()) {
LOG.debug("Loading parser-specific patterns from: " + grokPath);
LOG.debug("Loading parser-specific patterns: " + grokPattern);
}

InputStream patterInputStream = openInputStream(grokPath);
if (patterInputStream == null) {
throw new RuntimeException("Grok parser unable to initialize grok parser: Unable to load " + grokPath
+ " from either classpath or HDFS");
if (grokPattern == null) {
throw new RuntimeException("Unable to initialize grok parser: grokPattern config property is empty");
}
grok.addPatternFromReader(new InputStreamReader(patterInputStream));
grok.addPatternFromReader(new InputStreamReader(new ByteArrayInputStream(grokPattern.getBytes())));

if (LOG.isDebugEnabled()) {
LOG.debug("Grok parser set the following grok expression: " + grok.getNamedRegexCollectionById(patternLabel));
Expand Down Expand Up @@ -150,8 +146,8 @@ public List<JSONObject> parse(byte[] rawMessage) {

if (message.size() == 0)
throw new RuntimeException("Grok statement produced a null message. Original message was: "
+ originalMessage + " and the parsed message was: " + message + " . Check the pattern at: "
+ grokPath);
+ originalMessage + " , parsed message was: " + message + " , pattern was: "
+ (LOG.isDebugEnabled() ? grokPattern : (patternLabel + " (Turn on DEBUG logging to see pattern text.)")));

message.put("original_string", originalMessage);
for (String timeField : timeFields) {
Expand Down Expand Up @@ -199,6 +195,12 @@ public boolean validate(JSONObject message) {
return false;
}

@Override
public void configurationUpdated(Map<String, Object> parserConfig) {
configure(parserConfig);
init();
}

protected void postParse(JSONObject message) {}

protected long toEpoch(String datetime) throws ParseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,28 @@
import backtype.storm.tuple.Values;
import org.apache.metron.common.Constants;
import org.apache.metron.common.bolt.ConfiguredParserBolt;
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.FieldTransformer;
import org.apache.metron.common.configuration.FieldValidator;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.dsl.Context;
import org.apache.metron.common.dsl.FunctionResolver;
import org.apache.metron.common.dsl.StellarFunctions;
import org.apache.metron.common.utils.ErrorUtils;
import org.apache.metron.parsers.filters.Filters;
import org.apache.metron.common.configuration.FieldTransformer;
import org.apache.metron.parsers.filters.GenericMessageFilter;
import org.apache.metron.common.utils.ErrorUtils;
import org.apache.metron.parsers.interfaces.MessageFilter;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

public class ParserBolt extends ConfiguredParserBolt implements Serializable {

Expand All @@ -51,6 +56,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
private MessageFilter<JSONObject> filter = new GenericMessageFilter();
private WriterHandler writer;
private org.apache.metron.common.dsl.Context stellarContext;
protected AtomicBoolean configUpdatedFlag = new AtomicBoolean(false);
public ParserBolt( String zookeeperUrl
, String sensorType
, MessageParser<JSONObject> parser
Expand Down Expand Up @@ -109,13 +115,20 @@ protected void initializeStellar() {
@Override
public void execute(Tuple tuple) {
byte[] originalMessage = tuple.getBinary(0);

//Config update check and config read must be done together
boolean updateConfig = configUpdatedFlag.getAndSet(false);
SensorParserConfig sensorParserConfig = getSensorParserConfig();

try {
//we want to ack the tuple in the situation where we have are not doing a bulk write
//otherwise we want to defer to the writerComponent who will ack on bulk commit.
boolean ackTuple = !writer.handleAck();
int numWritten = 0;
if(sensorParserConfig != null) {
if (updateConfig) {
parser.configurationUpdated(sensorParserConfig.getParserConfig());
}
List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
Optional<List<JSONObject>> messages = parser.parseOptional(originalMessage);
for (JSONObject message : messages.orElse(Collections.emptyList())) {
Expand Down Expand Up @@ -168,4 +181,13 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(Constants.INVALID_STREAM, new Fields("message"));
declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));
}

@Override
public void updateConfig(String path, byte[] data) throws IOException {
super.updateConfig(path, data);
String pathWithoutTrailingSlash = path.replaceAll("/+$", "");
if (pathWithoutTrailingSlash.equals(ConfigurationType.PARSER.getZookeeperRoot() + "/" + getSensorType())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that there may be at least one segment of structured name between sensorType and a value that may have changed, thus still requiring a "starts with"-like calculation. How about replace both lines 188 and 189 with:
if (path.matches("^" + ConfigurationType.PARSER.getZookeeperRoot() + "/" + getSensorType() + "(/|$)")) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry I don't follow. ConfigurationType.PARSER.getZookeeperRoot() is constant and sensorType is a leaf node in Zookeeper. Maybe an example will help.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, if sensorType is known to always be a leaf node in ZK, that's sufficient. Thanks.

configUpdatedFlag.set(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.metron.parsers.interfaces;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -52,4 +51,7 @@ default Optional<List<T>> parseOptional(byte[] parseMessage) {
*/
boolean validate(T message);

default void configurationUpdated(Map<String, Object> config) {
}

}
Loading