Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions metron-platform/metron-common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,28 @@ The configuration is a complex JSON object with the following top level fields:
###The `enrichment` Configuration


| Field | Description | Example |
|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------|
| Field | Description | Example |
|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------|
| `fieldToTypeMap` | In the case of a simple HBase enrichment (i.e. a key/value lookup), the mapping between fields and the enrichment types associated with those fields must be known. This enrichment type is used as part of the HBase key. | `"fieldToTypeMap" : { "ip_src_addr" : [ "asset_enrichment" ] }` |
| `fieldMap` | The map of enrichment bolts names to fields in the JSON messages.,Each field is sent to the enrichment referenced in the key. | `"fieldMap": {"hbaseEnrichment": ["ip_src_addr","ip_dst_addr"]}` |
| `fieldMap` | The map of enrichment bolts names to fields in the JSON messages.,Each field is sent to the enrichment referenced in the key. | `"fieldMap": {"hbaseEnrichment": ["ip_src_addr","ip_dst_addr"]}` |
| `config` | The general configuration for the enrichment | `"config": {"typeToColumnFamily": { "asset_enrichment","cf" } }` |

The `config` map is intended to house enrichment specific configuration.
For instance, for the `hbaseEnrichment`, the mappings between the
enrichment types to the column families is specified.

###The `threatIntel` Configuration

| Field | Description | Example |
|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------|
| Field | Description | Example |
|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------|
| `fieldToTypeMap` | In the case of a simple HBase threat intel enrichment (i.e. a key/value lookup), the mapping between fields and the enrichment types associated with those fields must be known. This enrichment type is used as part of the HBase key. | `"fieldToTypeMap" : { "ip_src_addr" : [ "malicious_ips" ] }` |
| `fieldMap` | The map of threat intel enrichment bolts names to fields in the JSON messages. Each field is sent to the threat intel enrichment bolt referenced in the key. | `"fieldMap": {"hbaseThreatIntel": ["ip_src_addr","ip_dst_addr"]}` |
| `triageConfig` | The configuration of the threat triage scorer. In the situation where a threat is detected, a score is assigned to the message and embedded in the indexed message. | `"riskLevelRules" : { "IN_SUBNET(ip_dst_addr, '192.168.0.0/24')" : 10 }` |
| `triageConfig` | The configuration of the threat triage scorer. In the situation where a threat is detected, a score is assigned to the message and embedded in the indexed message. | `"riskLevelRules" : { "IN_SUBNET(ip_dst_addr, '192.168.0.0/24')" : 10 }` |
| `config` | The general configuration for the Threat Intel | `"config": {"typeToColumnFamily": { "malicious_ips","cf" } }` |

The `config` map is intended to house threat intel specific configuration.
For instance, for the `hbaseThreatIntel` threat intel adapter, the mappings between the
enrichment types to the column families is specified.

The `triageConfig` field is also a complex field and it bears some description:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@
public class EnrichmentConfig {
private Map<String, List<String>> fieldMap = new HashMap<>();
private Map<String, List<String>> fieldToTypeMap = new HashMap<>();
private Map<String, Object> config = new HashMap<>();

public Map<String, Object> getConfig() {
return config;
}

public void setConfig(Map<String, Object> config) {
this.config = config;
}

public Map<String, List<String>> getFieldMap() {
return fieldMap;
Expand All @@ -42,6 +51,15 @@ public void setFieldToTypeMap(Map<String, List<String>> fieldToTypeMap) {
this.fieldToTypeMap = fieldToTypeMap;
}

@Override
public String toString() {
return "EnrichmentConfig{" +
"fieldMap=" + fieldMap +
", fieldToTypeMap=" + fieldToTypeMap +
", config=" + config +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -50,22 +68,17 @@ public boolean equals(Object o) {
EnrichmentConfig that = (EnrichmentConfig) o;

if (getFieldMap() != null ? !getFieldMap().equals(that.getFieldMap()) : that.getFieldMap() != null) return false;
return getFieldToTypeMap() != null ? getFieldToTypeMap().equals(that.getFieldToTypeMap()) : that.getFieldToTypeMap() == null;
if (getFieldToTypeMap() != null ? !getFieldToTypeMap().equals(that.getFieldToTypeMap()) : that.getFieldToTypeMap() != null)
return false;
return getConfig() != null ? getConfig().equals(that.getConfig()) : that.getConfig() == null;

}

@Override
public int hashCode() {
int result = getFieldMap() != null ? getFieldMap().hashCode() : 0;
result = 31 * result + (getFieldToTypeMap() != null ? getFieldToTypeMap().hashCode() : 0);
result = 31 * result + (getConfig() != null ? getConfig().hashCode() : 0);
return result;
}

@Override
public String toString() {
return "EnrichmentConfig{" +
"fieldMap=" + fieldMap +
", fieldToTypeMap=" + fieldToTypeMap +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.metron.enrichment.lookup.accesstracker.PersistentAccessTracker;
import org.apache.metron.dataloads.bulk.ThreatIntelBulkLoader;
import org.apache.metron.dataloads.nonbulk.taxii.TaxiiLoader;
import org.apache.metron.enrichment.lookup.handler.KeyWithContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -118,7 +119,7 @@ public void test() throws Exception {
)
)
);
Assert.assertTrue(lookup.exists((EnrichmentKey)k, testTable, true));
Assert.assertTrue(lookup.exists((EnrichmentKey)k, new EnrichmentLookup.HBaseContext(testTable, cf), true));
}
pat.persist(true);
for(LookupKey k : goodKeysOtherHalf) {
Expand All @@ -129,7 +130,7 @@ public void test() throws Exception {
)
)
);
Assert.assertTrue(lookup.exists((EnrichmentKey)k, testTable, true));
Assert.assertTrue(lookup.exists((EnrichmentKey)k, new EnrichmentLookup.HBaseContext(testTable, cf), true));
}
testUtil.flush();
Assert.assertFalse(lookup.getAccessTracker().hasSeen(goodKeysHalf.get(0)));
Expand All @@ -153,10 +154,10 @@ public void test() throws Exception {
Job job = LeastRecentlyUsedPruner.createJob(config, tableName, cf, atTableName, atCF, ts);
Assert.assertTrue(job.waitForCompletion(true));
for(LookupKey k : goodKeys) {
Assert.assertTrue(lookup.exists((EnrichmentKey)k, testTable, true));
Assert.assertTrue(lookup.exists((EnrichmentKey)k, new EnrichmentLookup.HBaseContext(testTable, cf), true));
}
for(LookupKey k : badKey) {
Assert.assertFalse(lookup.exists((EnrichmentKey)k, testTable, true));
Assert.assertFalse(lookup.exists((EnrichmentKey)k, new EnrichmentLookup.HBaseContext(testTable, cf), true));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ public JSONObject enrich(CacheKey value) {
try {
for (LookupKV<EnrichmentKey, EnrichmentValue> kv :
lookup.get(Iterables.transform(enrichmentTypes
, new EnrichmentUtils.TypeToKey(value.getValue())
, new EnrichmentUtils.TypeToKey( value.getValue()
, lookup.getTable()
, value.getConfig().getEnrichment()
)
)
, lookup.getTable()
, false
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ public JSONObject enrich(CacheKey value) {
try {
for (Boolean isThreat :
lookup.exists(Iterables.transform(enrichmentTypes
, new EnrichmentUtils.TypeToKey(value.getValue())
, new EnrichmentUtils.TypeToKey(value.getValue()
, lookup.getTable()
, value.getConfig().getThreatIntel()
)
)
, lookup.getTable()
, false
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.metron.enrichment.lookup;

import com.google.common.collect.Iterables;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
Expand All @@ -25,56 +26,85 @@
import org.apache.metron.enrichment.converter.EnrichmentKey;
import org.apache.metron.enrichment.converter.EnrichmentValue;
import org.apache.metron.enrichment.lookup.accesstracker.AccessTracker;
import org.apache.metron.enrichment.lookup.handler.KeyWithContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;


public class EnrichmentLookup extends Lookup<HTableInterface, EnrichmentKey, LookupKV<EnrichmentKey,EnrichmentValue>> implements AutoCloseable {
public class EnrichmentLookup extends Lookup<EnrichmentLookup.HBaseContext, EnrichmentKey, LookupKV<EnrichmentKey,EnrichmentValue>> implements AutoCloseable {

public static class Handler implements org.apache.metron.enrichment.lookup.handler.Handler<HTableInterface,EnrichmentKey,LookupKV<EnrichmentKey,EnrichmentValue>> {
public static class HBaseContext {
private HTableInterface table;
private String columnFamily;
public HBaseContext(HTableInterface table, String columnFamily) {
this.table = table;
this.columnFamily = columnFamily;
}

public HTableInterface getTable() { return table; }
public String getColumnFamily() { return columnFamily; }
}

public static class Handler implements org.apache.metron.enrichment.lookup.handler.Handler<HBaseContext,EnrichmentKey,LookupKV<EnrichmentKey,EnrichmentValue>> {
String columnFamily;
HbaseConverter<EnrichmentKey, EnrichmentValue> converter = new EnrichmentConverter();
public Handler(String columnFamily) {
this.columnFamily = columnFamily;
}

private String getColumnFamily(HBaseContext context) {
return context.getColumnFamily() == null?columnFamily:context.getColumnFamily();
}

@Override
public boolean exists(EnrichmentKey key, HTableInterface table, boolean logAccess) throws IOException {
return table.exists(converter.toGet(columnFamily, key));
public boolean exists(EnrichmentKey key, HBaseContext context, boolean logAccess) throws IOException {
return context.getTable().exists(converter.toGet(getColumnFamily(context), key));
}

@Override
public LookupKV<EnrichmentKey, EnrichmentValue> get(EnrichmentKey key, HTableInterface table, boolean logAccess) throws IOException {
return converter.fromResult(table.get(converter.toGet(columnFamily, key)), columnFamily);
public LookupKV<EnrichmentKey, EnrichmentValue> get(EnrichmentKey key, HBaseContext context, boolean logAccess) throws IOException {
return converter.fromResult(context.getTable().get(converter.toGet(getColumnFamily(context), key)), getColumnFamily(context));
}

private List<Get> keysToGets(Iterable<EnrichmentKey> keys) {
private List<Get> keysToGets(Iterable<KeyWithContext<EnrichmentKey, HBaseContext>> keys) {
List<Get> ret = new ArrayList<>();
for(EnrichmentKey key : keys) {
ret.add(converter.toGet(columnFamily, key));
for(KeyWithContext<EnrichmentKey, HBaseContext> key : keys) {
ret.add(converter.toGet(getColumnFamily(key.getContext()), key.getKey()));
}
return ret;
}

@Override
public Iterable<Boolean> exists(Iterable<EnrichmentKey> key, HTableInterface table, boolean logAccess) throws IOException {
public Iterable<Boolean> exists(Iterable<KeyWithContext<EnrichmentKey, HBaseContext>> key, boolean logAccess) throws IOException {
List<Boolean> ret = new ArrayList<>();
if(Iterables.isEmpty(key)) {
return Collections.emptyList();
}
HTableInterface table = Iterables.getFirst(key, null).getContext().getTable();
for(boolean b : table.existsAll(keysToGets(key))) {
ret.add(b);
}
return ret;
}

@Override
public Iterable<LookupKV<EnrichmentKey, EnrichmentValue>> get( Iterable<EnrichmentKey> keys
, HTableInterface table
public Iterable<LookupKV<EnrichmentKey, EnrichmentValue>> get( Iterable<KeyWithContext<EnrichmentKey, HBaseContext>> keys
, boolean logAccess
) throws IOException
{
if(Iterables.isEmpty(keys)) {
return Collections.emptyList();
}
HTableInterface table = Iterables.getFirst(keys, null).getContext().getTable();
List<LookupKV<EnrichmentKey, EnrichmentValue>> ret = new ArrayList<>();
Iterator<KeyWithContext<EnrichmentKey, HBaseContext>> keyWithContextIterator = keys.iterator();
for(Result result : table.get(keysToGets(keys))) {
ret.add(converter.fromResult(result, columnFamily));
HBaseContext context = keyWithContextIterator.next().getContext();
ret.add(converter.fromResult(result, getColumnFamily(context)));
}
return ret;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.metron.enrichment.lookup.accesstracker.AccessTracker;
import org.apache.metron.enrichment.lookup.handler.Handler;
import org.apache.metron.enrichment.lookup.handler.KeyWithContext;

import java.io.IOException;

Expand Down Expand Up @@ -68,24 +69,24 @@ public RESULT_T get(KEY_T key, CONTEXT_T context, boolean logAccess) throws IOEx
}

@Override
public Iterable<Boolean> exists(Iterable<KEY_T> key, CONTEXT_T context, boolean logAccess) throws IOException {
public Iterable<Boolean> exists(Iterable<KeyWithContext<KEY_T, CONTEXT_T>> key, boolean logAccess) throws IOException {
if(logAccess) {
for (KEY_T k : key) {
accessTracker.logAccess(k);
for (KeyWithContext<KEY_T, CONTEXT_T> k : key) {
accessTracker.logAccess(k.getKey());
}
}
return lookupHandler.exists(key, context, logAccess);
return lookupHandler.exists(key, logAccess);
}


@Override
public Iterable<RESULT_T> get(Iterable<KEY_T> key, CONTEXT_T context, boolean logAccess) throws IOException {
public Iterable<RESULT_T> get(Iterable<KeyWithContext<KEY_T, CONTEXT_T>> key, boolean logAccess) throws IOException {
if(logAccess) {
for (KEY_T k : key) {
accessTracker.logAccess(k);
for (KeyWithContext<KEY_T, CONTEXT_T> k : key) {
accessTracker.logAccess(k.getKey());
}
}
return lookupHandler.get(key, context, logAccess);
return lookupHandler.get(key, logAccess);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@
public interface Handler<CONTEXT_T, KEY_T extends LookupKey, RESULT_T> extends AutoCloseable{
boolean exists(KEY_T key, CONTEXT_T context, boolean logAccess) throws IOException;
RESULT_T get(KEY_T key, CONTEXT_T context, boolean logAccess) throws IOException;
Iterable<Boolean> exists(Iterable<KEY_T> key, CONTEXT_T context, boolean logAccess) throws IOException;
Iterable<RESULT_T> get(Iterable<KEY_T> key, CONTEXT_T context, boolean logAccess) throws IOException;
Iterable<Boolean> exists(Iterable<KeyWithContext<KEY_T, CONTEXT_T>> key, boolean logAccess) throws IOException;
Iterable<RESULT_T> get(Iterable<KeyWithContext<KEY_T, CONTEXT_T>> key, boolean logAccess) throws IOException;
}
Loading