Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions opensoc-streaming/.gitignore

This file was deleted.

14 changes: 0 additions & 14 deletions opensoc-streaming/.travis.yml

This file was deleted.

61 changes: 58 additions & 3 deletions opensoc-streaming/OpenSOC-Alerts/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@
<parent>
<groupId>com.opensoc</groupId>
<artifactId>OpenSOC-Streaming</artifactId>
<version>0.3BETA-SNAPSHOT</version>
<version>0.6BETA</version>
</parent>
<artifactId>OpenSOC-Alerts</artifactId>
<name>OpenSOC-Alerts</name>
<description>Taggers for alerts</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<commons.validator.version>1.4.0</commons.validator.version>
</properties>
<dependencies>
<dependency>
<groupId>com.opensoc</groupId>
<artifactId>OpenSOC-Common</artifactId>
<version>${parent.version}</version>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
Expand All @@ -39,6 +41,12 @@
<artifactId>storm-core</artifactId>
<version>${global_storm_version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down Expand Up @@ -69,13 +77,60 @@
<groupId>commons-validator</groupId>
<artifactId>commons-validator</artifactId>
<version>${commons.validator.version}</version>
<exclusions>
<exclusion>

<groupId>commons-beanutils</groupId>

<artifactId>commons-beanutils</artifactId>

</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18</version>
<configuration>
<systemProperties>
<property>
<name>mode</name>
<value>local</value>
</property>
</systemProperties>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-pmd-plugin</artifactId>
<version>3.3</version>
<configuration>
<targetJdk>1.7</targetJdk>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>emma-maven-plugin</artifactId>
<version>1.0-alpha-3</version>
<inherited>true</inherited>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>
</project>
</project>
104 changes: 104 additions & 0 deletions opensoc-streaming/OpenSOC-Alerts/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#OpenSOC-Alerts

##Module Description

This module enables telemetry alerts. It splits the mssage stream into two streams. The original message is emitted on the "message" stream. The corresponding alert is emitted on the "alerts" stream. The two are tied together through the alerts UUID.

##Message Format

Assuming the original message (with enrichments enabled) has the following format:

```json
{
"message":
{"ip_src_addr": xxxx,
"ip_dst_addr": xxxx,
"ip_src_port": xxxx,
"ip_dst_port": xxxx,
"protocol": xxxx,
"timestamp": xxxx.
"original_string": xxxx,
"additional-field 1": xxxx,
},
"enrichment" : {"geo": xxxx, "whois": xxxx, "hosts": xxxxx, "CIF": "xxxxx"}

}
```

The telemetry message will be tagged with a UUID alert tag like so:

```json
{
"message":
{"ip_src_addr": xxxx,
"ip_dst_addr": xxxx,
"ip_src_port": xxxx,
"ip_dst_port": xxxx,
"protocol": xxxx,
"timestamp": xxxx,
"original_string": xxxx,
"additional-field 1": xxxx,
},
"enrichment" : {"geo": xxxx, "whois": xxxx, "hosts": xxxxx, "CIF": "xxxxx"},
"alerts": [UUID1, UUID2, UUID3, etc]

}
```

The alert will be fired on the "alerts" stream and can be customized to have any format as long as it includes the required mandatory fields. The mandatory fields are:

* timestamp (epoch): The time from the message that triggered the alert
* description: A human friendly string representation of the alert
* alert_id: The UUID generated for the alert. This uniquely identifies an alert

There are other standard but not mandatory fields that can be leveraged by opensoc-ui and other alert consumers:

* designated_host: The IP address that corresponds to an asset. Ex. The IP address of the company device associated with the alert.
* enrichment: A copy of the enrichment data from the message that triggered the alert
* priority: The priority of the alert. Mustb e set to one of HIGH, MED or LOW

An example of an alert with all mandatory and standard fields would look like so:

```json
{
"timestamp": xxxx,
"alert_id": UUID,
"description": xxxx,
"designated_host": xxxx,
"enrichment": { "geo": xxxx, "whois": xxxx, "cif": xxxx },
"priority": "MED"
}
```

##Alerts Bolt

The bolt can be extended with a variety of alerts adapters. The ability to stack alerts is currently in beta, but is not currently advisable. We advice to only have one alerts bolt per topology. The adapters are rules-based adapters which fire alerts when rules are a match. Currently only Java adapters are provided, but there are future plans to provide Grok-Based adapters as well.

The signature of the Alerts bolt is as follows:

```
TelemetryAlertsBolt alerts_bolt = new TelemetryAlertsBolt()
.withIdentifier(alerts_identifier).withMaxCacheSize(1000)
.withMaxTimeRetain(3600).withAlertsAdapter(alerts_adapter)
.withMetricConfiguration(config);
```
Identifier - JSON key where the alert is attached
TimeRetain & MaxCacheSize - Caching parameters for the bolt
MetricConfiguration - export custom bolt metrics to graphite (if not null)
AlertsAdapter - pick the appropriate adapter for generating the alerts

### Java Adapters

Java adapters are designed for high volume topologies, but are not easily extensible. The adapters provided are:

* com.opensoc.alerts.adapters.AllAlertsAdapter - will tag every single message with the static alert (appropriate for topologies like Sourcefire, etc, where every single message is an alert)
* com.opensoc.alerts.adapters.HbaseWhiteAndBlacklistAdapter - will read white and blacklists from HBase and fire alerts if source or dest IP are not on the whitelist or if any IP is on the blacklist
* com.opensoc.alerts.adapters.CIFAlertsAdapter - will alert on messages that have results in enrichment.cif.
* com.opensoc.alerts.adpaters.KeywordsAlertAdapter - will alert on messages that contain any of a list of keywords
###Grok Adapters

Grok alerts adapters for OpenSOC are still under devleopment

###Stacking Alert Adapters

The functionality to stack alerts adapters is still under development
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public abstract class AbstractAlertBolt extends BaseRichBolt {
protected JSONObject _identifier;
protected MetricReporter _reporter;

protected int _MAX_CACHE_SIZE = -1;
protected int _MAX_TIME_RETAIN = -1;
protected int _MAX_CACHE_SIZE_OBJECTS_NUM = -1;
protected int _MAX_TIME_RETAIN_MINUTES = -1;

protected Counter ackCounter, emitCounter, failCounter;

Expand All @@ -82,10 +82,10 @@ public final void prepare(Map conf, TopologyContext topologyContext,
if (this._identifier == null)
throw new IllegalStateException("Identifier must be specified");

if (this._MAX_CACHE_SIZE == -1)
throw new IllegalStateException("MAX_CACHE_SIZE must be specified");
if (this._MAX_TIME_RETAIN == -1)
throw new IllegalStateException("MAX_TIME_RETAIN must be specified");
if (this._MAX_CACHE_SIZE_OBJECTS_NUM == -1)
throw new IllegalStateException("MAX_CACHE_SIZE_OBJECTS_NUM must be specified");
if (this._MAX_TIME_RETAIN_MINUTES == -1)
throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified");

try {
doPrepare(conf, topologyContext, collector);
Expand All @@ -95,6 +95,7 @@ public final void prepare(Map conf, TopologyContext topologyContext,
}

boolean success = _adapter.initialize();

try {
if (!success)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import com.esotericsoftware.minlog.Log;
import com.google.common.cache.CacheBuilder;
import com.opensoc.alerts.interfaces.AlertsAdapter;
import com.opensoc.helpers.topology.ErrorGenerator;
import com.opensoc.json.serialization.JSONEncoderHelper;
import com.opensoc.metrics.MetricReporter;
import com.opensoc.topologyhelpers.ErrorGenerator;

@SuppressWarnings("rawtypes")
public class TelemetryAlertsBolt extends AbstractAlertBolt {
Expand Down Expand Up @@ -120,33 +119,33 @@ public TelemetryAlertsBolt withMetricConfiguration(Configuration config) {
}

/**
* @param MAX_CACHE_SIZE
* @param MAX_CACHE_SIZE_OBJECTS_NUM
* Maximum size of cache before flushing
* @return Instance of this class
*/

public TelemetryAlertsBolt withMaxCacheSize(int MAX_CACHE_SIZE) {
_MAX_CACHE_SIZE = MAX_CACHE_SIZE;
public TelemetryAlertsBolt withMaxCacheSize(int MAX_CACHE_SIZE_OBJECTS_NUM) {
_MAX_CACHE_SIZE_OBJECTS_NUM = MAX_CACHE_SIZE_OBJECTS_NUM;
return this;
}

/**
* @param MAX_TIME_RETAIN
* @param MAX_TIME_RETAIN_MINUTES
* Maximum time to retain cached entry before expiring
* @return Instance of this class
*/

public TelemetryAlertsBolt withMaxTimeRetain(int MAX_TIME_RETAIN) {
_MAX_TIME_RETAIN = MAX_TIME_RETAIN;
public TelemetryAlertsBolt withMaxTimeRetain(int MAX_TIME_RETAIN_MINUTES) {
_MAX_TIME_RETAIN_MINUTES = MAX_TIME_RETAIN_MINUTES;
return this;
}

@Override
void doPrepare(Map conf, TopologyContext topologyContext,
OutputCollector collector) throws IOException {

cache = CacheBuilder.newBuilder().maximumSize(_MAX_CACHE_SIZE)
.expireAfterWrite(_MAX_TIME_RETAIN, TimeUnit.MINUTES).build();
cache = CacheBuilder.newBuilder().maximumSize(_MAX_CACHE_SIZE_OBJECTS_NUM)
.expireAfterWrite(_MAX_TIME_RETAIN_MINUTES, TimeUnit.MINUTES).build();

LOG.info("[OpenSOC] Preparing TelemetryAlert Bolt...");

Expand Down Expand Up @@ -185,22 +184,22 @@ public void execute(Tuple tuple) {
JSONArray uuid_list = new JSONArray();

if (alerts_list == null || alerts_list.isEmpty()) {
LOG.trace("[OpenSOC] No alerts detected in: "
System.out.println("[OpenSOC] No alerts detected in: "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we replace LOG.trace with System.out.println? Isn't that exactly the opposite of what we want?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a piece of debugging that someone forgot to comment out. We can ignore this

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

James, do you mean that this particular line will be reverted back to LOG.trace? We as a group should not have code in the master branch with System.out.*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok to have system.outs in the runners that launch the topology, but it shouldn't exist in the bolts themselves. you are correct. we should change it back to log.trace

+ original_message);
_collector.ack(tuple);
_collector.emit(new Values(original_message));
_collector.emit("message", new Values(key, original_message));
} else {
for (String alert : alerts_list.keySet()) {
uuid_list.add(alert);

LOG.trace("[OpenSOC] Checking alerts cache: " + alert);

if (cache.getIfPresent(alert) == null) {
LOG.trace("[OpenSOC]: Alert not found in cache: " + alert);
System.out.println("[OpenSOC]: Alert not found in cache: " + alert);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it just needs to be removed as it serves no purpose. the log is already logging the same line. there is no reason to print it out

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the log isn't logging that same line any more -- the diff shows that the call to the logger was replaced by System.out, which we probably want to switch back.


JSONObject global_alert = new JSONObject();
global_alert.putAll(_identifier);
global_alert.put("triggered", alerts_list.get(alert));
global_alert.putAll(alerts_list.get(alert));
global_alert.put("timestamp", System.currentTimeMillis());
_collector.emit("alert", new Values(global_alert));

Expand Down Expand Up @@ -244,11 +243,9 @@ public void execute(Tuple tuple) {
* if (metricConfiguration != null) { failCounter.inc(); }
*/

String error_as_string = org.apache.commons.lang.exception.ExceptionUtils
.getStackTrace(e);

JSONObject error = ErrorGenerator.generateErrorMessage(
"Alerts problem: " + original_message, error_as_string);
"Alerts problem: " + original_message, e);
_collector.emit("error", new Values(error));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package com.opensoc.alerts.adapters;

import java.io.Serializable;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -62,9 +60,9 @@ private String makeKey(String ip1, String ip2, int alert_type) {
return (ip1 + "-" + ip2 + "-" + alert_type);
}

private void generateCache(int _MAX_CACHE_SIZE, int _MAX_TIME_RETAIN)
protected void generateCache(int _MAX_CACHE_SIZE_OBJECTS_NUM, int _MAX_TIME_RETAIN_MINUTES)
{
cache = CacheBuilder.newBuilder().maximumSize(_MAX_CACHE_SIZE)
.expireAfterWrite(_MAX_TIME_RETAIN, TimeUnit.MINUTES).build();
cache = CacheBuilder.newBuilder().maximumSize(_MAX_CACHE_SIZE_OBJECTS_NUM)
.expireAfterWrite(_MAX_TIME_RETAIN_MINUTES, TimeUnit.MINUTES).build();
}
}
Loading