diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py index 1781225d6f..74a41b8b03 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py @@ -206,7 +206,7 @@ ) # Metron HBase configuration -enrichment_hbase_provider_impl = 'org.apache.metron.hbase.HTableProvider' +enrichment_lookup_factory = 'HBASE' enrichment_hbase_table = status_params.enrichment_hbase_table enrichment_hbase_cf = status_params.enrichment_hbase_cf # coprocessor config for enrichment list diff --git a/metron-platform/metron-data-management/pom.xml b/metron-platform/metron-data-management/pom.xml index 3e67c59288..415fc57913 100644 --- a/metron-platform/metron-data-management/pom.xml +++ b/metron-platform/metron-data-management/pom.xml @@ -109,6 +109,13 @@ + + org.apache.metron + metron-enrichment-common + ${project.parent.version} + test-jar + test + org.apache.metron metron-hbase-common diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java index 82b4d3a313..93d9019618 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java @@ -17,12 +17,14 @@ */ package org.apache.metron.dataloads.hbase.mr; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.metron.enrichment.lookup.LookupKey; import org.apache.metron.enrichment.lookup.accesstracker.AccessTracker; import org.apache.metron.enrichment.lookup.accesstracker.AccessTrackerUtil; +import org.apache.metron.hbase.client.HBaseConnectionFactory; import java.io.IOException; @@ -31,22 +33,32 @@ public class PrunerMapper extends TableMapper { public static final String ACCESS_TRACKER_CF_CONF = "access_tracker_cf"; public static final String TIMESTAMP_CONF = "access_tracker_timestamp"; public static final String ACCESS_TRACKER_NAME_CONF = "access_tracker_name"; - AccessTracker tracker; + private AccessTracker tracker; + private HBaseConnectionFactory connectionFactory; + + public PrunerMapper() { + this.connectionFactory = new HBaseConnectionFactory(); + } + @Override - public void setup(Context context) throws IOException - { + public void setup(Context context) throws IOException { String atTable = context.getConfiguration().get(ACCESS_TRACKER_TABLE_CONF); String atCF = context.getConfiguration().get(ACCESS_TRACKER_CF_CONF); String atName = context.getConfiguration().get(ACCESS_TRACKER_NAME_CONF); - HTable table = new HTable(context.getConfiguration(), atTable); - long timestamp = context.getConfiguration().getLong(TIMESTAMP_CONF, -1); - if(timestamp < 0) { - throw new IllegalStateException("Must specify a timestamp that is positive."); - } - try { - tracker = AccessTrackerUtil.INSTANCE.loadAll(AccessTrackerUtil.INSTANCE.loadAll(table, atCF, atName, timestamp)); - } catch (Throwable e) { - throw new IllegalStateException("Unable to load the accesstrackers from the directory", e); + + // setup the HBase connection + try(Connection connection = connectionFactory.createConnection(context.getConfiguration()); + Table table = connection.getTable(TableName.valueOf(atTable))) { + + long timestamp = context.getConfiguration().getLong(TIMESTAMP_CONF, -1); + if(timestamp < 0) { + throw new IllegalStateException("Must specify a timestamp that is positive."); + } + try { + tracker = AccessTrackerUtil.INSTANCE.loadAll(AccessTrackerUtil.INSTANCE.loadAll(table, atCF, atName, timestamp)); + } catch (Throwable e) { + throw new IllegalStateException("Unable to load the accesstrackers from the directory", e); + } } } diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/HBaseExtractorState.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/HBaseExtractorState.java index f0ee3ad7d6..fbf3bf3e54 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/HBaseExtractorState.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/HBaseExtractorState.java @@ -19,20 +19,20 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.metron.dataloads.extractor.Extractor; import org.apache.metron.enrichment.converter.HbaseConverter; import java.io.IOException; public class HBaseExtractorState { - private HTableInterface table; + private Table table; private Extractor extractor; private HbaseConverter converter; private FileSystem fs; private String cf; - public HBaseExtractorState(HTableInterface table, String cf, Extractor extractor, HbaseConverter converter, Configuration config) { + public HBaseExtractorState(Table table, String cf, Extractor extractor, HbaseConverter converter, Configuration config) { this.table = table; this.extractor = extractor; this.converter = converter; @@ -48,7 +48,7 @@ public String getCf() { return cf; } - public HTableInterface getTable() { + public Table getTable() { return table; } diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/AbstractLocalImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/AbstractLocalImporter.java index 1709931da9..cd3f7a4ac0 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/AbstractLocalImporter.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/AbstractLocalImporter.java @@ -19,14 +19,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.metron.common.utils.cli.CLIOptions; import org.apache.metron.common.utils.file.ReaderSpliterator; import org.apache.metron.dataloads.extractor.ExtractorHandler; import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat; -import org.apache.metron.common.utils.cli.CLIOptions; -import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions; import org.apache.metron.dataloads.nonbulk.flatfile.location.Location; import org.apache.metron.dataloads.nonbulk.flatfile.location.LocationStrategy; import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput; +import org.apache.metron.hbase.client.HBaseConnectionFactory; import java.io.BufferedReader; import java.io.IOException; @@ -40,11 +41,21 @@ public abstract class AbstractLocalImporter & CLIOptions, STATE_T> implements Importer { + private HBaseConnectionFactory connectionFactory; + private Connection connection; + + public AbstractLocalImporter(HBaseConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } + @Override public void importData( final EnumMap> config , final ExtractorHandler handler , final Configuration hadoopConfig ) throws IOException, InvalidWriterOutput { + + connection = connectionFactory.createConnection(hadoopConfig); + validateState(config, handler); ThreadLocal state = createState(config, hadoopConfig, handler); boolean quiet = isQuiet(config); @@ -64,6 +75,17 @@ public void importData( final EnumMap> config } } + protected Connection getConnection() { + return connection; + } + + @Override + public void close() throws IOException { + if(connection != null) { + connection.close(); + } + } + protected abstract List getInputs(final EnumMap> config); protected abstract boolean isQuiet(final EnumMap> config); protected abstract int batchSize(final EnumMap> config); diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java index 0c7faf686f..b915781b69 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java @@ -19,17 +19,14 @@ package org.apache.metron.dataloads.nonbulk.flatfile.importer; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.metron.dataloads.extractor.ExtractorHandler; -import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions; import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput; -import org.apache.metron.enrichment.converter.EnrichmentConverter; +import java.io.Closeable; import java.io.IOException; import java.util.EnumMap; -import java.util.List; import java.util.Optional; -public interface Importer> { +public interface Importer> extends Closeable { void importData(EnumMap> config, ExtractorHandler handler , final Configuration hadoopConfig) throws IOException, InvalidWriterOutput; } diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java index ec37585d40..2c0bff0807 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java @@ -18,8 +18,9 @@ package org.apache.metron.dataloads.nonbulk.flatfile.importer; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; import org.apache.metron.dataloads.extractor.Extractor; import org.apache.metron.dataloads.extractor.ExtractorHandler; import org.apache.metron.dataloads.nonbulk.flatfile.HBaseExtractorState; @@ -27,25 +28,22 @@ import org.apache.metron.enrichment.converter.EnrichmentConverter; import org.apache.metron.enrichment.converter.HbaseConverter; import org.apache.metron.enrichment.lookup.LookupKV; -import org.apache.metron.hbase.HTableProvider; +import org.apache.metron.hbase.client.HBaseConnectionFactory; -import java.io.*; -import java.util.*; +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Optional; public class LocalImporter extends AbstractLocalImporter { - public interface HTableProviderRetriever { - HTableProvider retrieve(); - } - - HTableProviderRetriever provider; - - public LocalImporter(HTableProviderRetriever provider) { - this.provider = provider; + public LocalImporter(HBaseConnectionFactory connectionFactory) { + super(connectionFactory); } public LocalImporter() { - this(() -> new HTableProvider()); + this(new HBaseConnectionFactory()); } @@ -86,8 +84,9 @@ protected ThreadLocal createState(EnumMap stateList; public LocalSummarizer() { + super(new HBaseConnectionFactory()); stateList = Collections.synchronizedList(new ArrayList<>()); } diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java index 1b34ed48a1..aa0effdbf9 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java @@ -75,4 +75,9 @@ public void importData(EnumMap> config throw new IllegalStateException("Unable to complete job: " + e.getMessage(), e); } } + + @Override + public void close() { + // nothing to do + } } diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java index de795c5864..e96693e28f 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java @@ -18,35 +18,8 @@ package org.apache.metron.dataloads.nonbulk.taxii; -import java.io.IOException; -import java.io.StringWriter; -import java.lang.invoke.MethodHandles; -import java.net.URI; -import java.net.URL; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TimerTask; -import javax.xml.XMLConstants; -import javax.xml.bind.JAXBException; -import javax.xml.datatype.DatatypeConfigurationException; -import javax.xml.datatype.DatatypeFactory; -import javax.xml.datatype.XMLGregorianCalendar; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerException; -import javax.xml.transform.TransformerFactory; -import javax.xml.transform.dom.DOMSource; -import javax.xml.transform.stream.StreamResult; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; @@ -68,10 +41,14 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.metron.common.utils.RuntimeErrors; import org.apache.metron.dataloads.extractor.Extractor; -import org.apache.metron.enrichment.converter.EnrichmentConverter; import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.converter.EnrichmentValue; import org.apache.metron.enrichment.lookup.LookupKV; +import org.apache.metron.hbase.ColumnList; +import org.apache.metron.hbase.client.HBaseClient; +import org.apache.metron.hbase.client.HBaseClientFactory; +import org.apache.metron.hbase.client.HBaseConnectionFactory; +import org.apache.metron.hbase.client.HBaseTableClientFactory; import org.mitre.taxii.client.HttpClient; import org.mitre.taxii.messages.xml11.AnyMixedContentType; import org.mitre.taxii.messages.xml11.CollectionInformationRequest; @@ -93,6 +70,32 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; +import javax.xml.XMLConstants; +import javax.xml.bind.JAXBException; +import javax.xml.datatype.DatatypeConfigurationException; +import javax.xml.datatype.DatatypeFactory; +import javax.xml.datatype.XMLGregorianCalendar; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import java.io.IOException; +import java.io.StringWriter; +import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.net.URL; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimerTask; + public class TaxiiHandler extends TimerTask { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -116,21 +119,29 @@ protected ObjectFactory initialValue() { private Extractor extractor; private String hbaseTable; private String columnFamily; - private Map connectionCache = new HashMap<>(); + private Map connectionCache = new HashMap<>(); private HttpClientContext context; private String collection; private String subscriptionId; - private EnrichmentConverter converter = new EnrichmentConverter(); private Date beginTime; private Configuration config; private boolean inProgress = false; private Set allowedIndicatorTypes; + private HBaseClientFactory hBaseClientFactory; + private HBaseConnectionFactory hBaseConnectionFactory; public TaxiiHandler( TaxiiConnectionConfig connectionConfig - , Extractor extractor - , Configuration config - ) throws Exception - { + , Extractor extractor + , Configuration config + ) throws Exception { + this(connectionConfig, extractor, new HBaseTableClientFactory(), config); + } + + public TaxiiHandler( TaxiiConnectionConfig connectionConfig + , Extractor extractor + , HBaseClientFactory hBaseClientFactory + , Configuration config + ) throws Exception { LOG.info("Loading configuration: {}", connectionConfig); this.allowedIndicatorTypes = connectionConfig.getAllowedIndicatorTypes(); this.extractor = extractor; @@ -143,22 +154,31 @@ public TaxiiHandler( TaxiiConnectionConfig connectionConfig this.proxy = connectionConfig.getProxy(); this.username = connectionConfig.getUsername(); this.password = connectionConfig.getPassword(); + this.hBaseClientFactory = hBaseClientFactory; + this.hBaseConnectionFactory = new HBaseConnectionFactory(); initializeClient(connectionConfig); LOG.info("Configured, starting polling {} for {}", endpoint, collection); } - protected synchronized HTableInterface getTable(String table) throws IOException { - HTableInterface ret = connectionCache.get(table); - if(ret == null) { - ret = createHTable(table); - connectionCache.put(table, ret); + protected synchronized HBaseClient getHBaseClient(String tableName) { + HBaseClient client = connectionCache.get(tableName); + if(client == null) { + client = hBaseClientFactory.create(hBaseConnectionFactory, config, tableName); + connectionCache.put(tableName, client); } - return ret; + return client; } - protected synchronized HTableInterface createHTable(String tableInfo) throws IOException { - return new HTable(config, tableInfo); + protected synchronized void cleanup() { + for(HBaseClient client: connectionCache.values()) { + try { + client.close(); + } catch(IOException e) { + LOG.error("Error while closing HBase client", e); + } + } } + /** * The action to be performed by this timer task. */ @@ -221,9 +241,9 @@ public void run() { kv.getValue().getMetadata().put("source_type", "taxii"); kv.getValue().getMetadata().put("taxii_url", endpoint.toString()); kv.getValue().getMetadata().put("taxii_collection", collection); - Put p = converter.toPut(columnFamily, kv.getKey(), kv.getValue()); - HTableInterface table = getTable(hbaseTable); - table.put(p); + + HBaseClient hBaseClient = getHBaseClient(hbaseTable); + write(hBaseClient, kv.getKey(), kv.getValue()); LOG.info("Found Threat Intel: {} => ", kv.getKey(), kv.getValue()); } } @@ -245,12 +265,22 @@ public void run() { finally { inProgress = false; beginTime = ts; + cleanup(); } } - public String getStringFromDocument(Document doc) - { - try - { + + private void write(HBaseClient hBaseClient, EnrichmentKey key, EnrichmentValue value) { + final byte[] columnFamilyBytes = Bytes.toBytes(columnFamily); + ColumnList columns = new ColumnList(); + for(Map.Entry kv : value.toColumns()) { + columns.addColumn(columnFamilyBytes, kv.getKey(), kv.getValue()); + } + hBaseClient.addMutation(key.toBytes(), columns); + hBaseClient.mutate(); + } + + public String getStringFromDocument(Document doc) { + try { DOMSource domSource = new DOMSource(doc); StringWriter writer = new StringWriter(); StreamResult result = new StreamResult(writer); diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java index d82be9d8f4..df3ccdccca 100644 --- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java +++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java @@ -23,8 +23,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.metron.dataloads.bulk.LeastRecentlyUsedPruner; @@ -35,8 +35,12 @@ import org.apache.metron.enrichment.lookup.LookupKey; import org.apache.metron.enrichment.lookup.accesstracker.BloomAccessTracker; import org.apache.metron.enrichment.lookup.accesstracker.PersistentAccessTracker; +import org.apache.metron.hbase.client.HBaseConnectionFactory; import org.apache.metron.test.utils.UnitTestHelper; -import org.junit.*; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; import java.util.ArrayList; import java.util.HashMap; @@ -49,8 +53,8 @@ public class LeastRecentlyUsedPrunerIntegrationTest { private static HBaseTestingUtility testUtil; /** The test table. */ - private static HTable testTable; - private static HTable atTable; + private static Table testTable; + private static Table atTable; private static final String tableName = "malicious_domains"; private static final String cf = "cf"; private static final String atTableName = "access_trackers"; @@ -65,8 +69,8 @@ public static void setup() throws Exception { Map.Entry kv = HBaseUtil.INSTANCE.create(true); config = kv.getValue(); testUtil = kv.getKey(); - testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf)); - atTable = testUtil.createTable(Bytes.toBytes(atTableName), Bytes.toBytes(atCF)); + testTable = testUtil.createTable(TableName.valueOf(tableName), cf); + atTable = testUtil.createTable(TableName.valueOf(atTableName), atCF); } @AfterClass @@ -102,7 +106,8 @@ public void testCommandLine() throws Exception { public void test() throws Exception { long ts = System.currentTimeMillis(); BloomAccessTracker bat = new BloomAccessTracker("tracker1", 100, 0.03); - PersistentAccessTracker pat = new PersistentAccessTracker(tableName, "0", atTable, atCF, bat, 0L); + PersistentAccessTracker pat = new PersistentAccessTracker(tableName, "0", atTable.getName().getNameAsString(), + atCF, bat, 0L, new HBaseConnectionFactory(), testUtil.getConfiguration()); EnrichmentLookup lookup = new EnrichmentLookup(testTable, cf, pat); List goodKeysHalf = getKeys(0, 5); List goodKeysOtherHalf = getKeys(5, 10); diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java index 443d39dae7..727d014e31 100644 --- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java +++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java @@ -26,10 +26,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.metron.common.configuration.ConfigurationsUtils; @@ -66,7 +67,7 @@ public class SimpleEnrichmentFlatFileLoaderIntegrationTest { private static HBaseTestingUtility testUtil; /** The test table. */ - private static HTable testTable; + private static Table testTable; private static Configuration config = null; private static TestingServer testZkServer; private static String zookeeperUrl; @@ -190,7 +191,7 @@ public static void setup() throws Exception { Map.Entry kv = HBaseUtil.INSTANCE.create(true); config = kv.getValue(); testUtil = kv.getKey(); - testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf)); + testTable = testUtil.createTable(TableName.valueOf(tableName), cf); zookeeperUrl = getZookeeperUrl(config.get("hbase.zookeeper.quorum"), testUtil.getZkCluster().getClientPort()); setupGlobalConfig(zookeeperUrl); diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java index a54c21b78c..a8b31f61b8 100644 --- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java +++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java @@ -18,29 +18,29 @@ package org.apache.metron.dataloads.nonbulk.taxii; -import com.google.common.base.Splitter; import org.adrianwalker.multilinestring.Multiline; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.PosixParser; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.metron.dataloads.extractor.Extractor; import org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator; import org.apache.metron.dataloads.extractor.stix.StixExtractor; -import org.apache.metron.enrichment.converter.EnrichmentConverter; import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.converter.EnrichmentValue; -import org.apache.metron.enrichment.lookup.LookupKV; -import org.apache.metron.hbase.mock.MockHTable; -import org.apache.metron.hbase.mock.MockHBaseTableProvider; -import org.junit.*; +import org.apache.metron.hbase.ColumnList; +import org.apache.metron.hbase.client.FakeHBaseClient; +import org.apache.metron.hbase.client.FakeHBaseClientFactory; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; import java.io.IOException; -import java.util.HashSet; -import java.util.Set; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class TaxiiIntegrationTest { @@ -52,18 +52,17 @@ public static void setup() throws IOException { @AfterClass public static void teardown() { MockTaxiiService.shutdown(); - MockHBaseTableProvider.clear(); } /** - { - "endpoint" : "http://localhost:8282/taxii-discovery-service" - ,"type" : "DISCOVER" - ,"collection" : "guest.Abuse_ch" - ,"table" : "threat_intel" - ,"columnFamily" : "cf" - ,"allowedIndicatorTypes" : [ "domainname:FQDN", "address:IPV_4_ADDR" ] - } + * { + * "endpoint": "http://localhost:8282/taxii-discovery-service", + * "type": "DISCOVER", + * "collection": "guest.Abuse_ch", + * "table": "threat_intel", + * "columnFamily": "cf", + * "allowedIndicatorTypes": [ "domainname:FQDN", "address:IPV_4_ADDR" ] + * } */ @Multiline static String taxiiConnectionConfig; @@ -93,57 +92,66 @@ public void testCommandLine() throws Exception { @Test public void testTaxii() throws Exception { + // delete any existing records + FakeHBaseClient fakeHBaseClient = new FakeHBaseClient(); + fakeHBaseClient.deleteAll(); - final MockHBaseTableProvider provider = new MockHBaseTableProvider(); + // setup the handler final Configuration config = HBaseConfiguration.create(); Extractor extractor = new TransformFilterExtractorDecorator(new StixExtractor()); - TaxiiHandler handler = new TaxiiHandler(TaxiiConnectionConfig.load(taxiiConnectionConfig), extractor, config ) { - @Override - protected synchronized HTableInterface createHTable(String tableInfo) throws IOException { - return provider.addToCache("threat_intel", "cf"); - } - }; - //UnitTestHelper.verboseLogging(); + TaxiiHandler handler = new TaxiiHandler( + TaxiiConnectionConfig.load(taxiiConnectionConfig), + extractor, + new FakeHBaseClientFactory(), + config); handler.run(); - Set maliciousDomains; + + // retrieve the data written to HBase by the taxii handler + Map results = new HashMap<>(); + List mutations = fakeHBaseClient.getAllPersisted(); + for(FakeHBaseClient.Mutation mutation: mutations) { + + // build the enrichment key + EnrichmentKey key = new EnrichmentKey("taxii", "et"); + key.fromBytes(mutation.rowKey); + + // expect only 1 column + List columns = mutation.columnList.getColumns(); + Assert.assertEquals(1, columns.size()); + ColumnList.Column column = columns.get(0); + + // build the enrichment value + EnrichmentValue value = new EnrichmentValue(); + value.fromColumn(column.getQualifier(), column.getValue()); + results.put(key, value); + } + + // validate the extracted taxii data + Assert.assertEquals(2, results.size()); { - MockHTable table = (MockHTable) provider.getTable(config, "threat_intel"); - maliciousDomains = getIndicators("domainname:FQDN", table.getPutLog(), "cf"); + EnrichmentKey key = new EnrichmentKey("address:IPV_4_ADDR", "94.102.53.142"); + Assert.assertTrue(results.containsKey(key)); + EnrichmentValue value = results.get(key); + Assert.assertEquals(6, value.getMetadata().size()); + Assert.assertEquals("guest.Abuse_ch", value.getMetadata().get("taxii_collection")); + Assert.assertEquals("STIX", value.getMetadata().get("source-type")); + Assert.assertEquals("address:IPV_4_ADDR", value.getMetadata().get("indicator-type")); + Assert.assertEquals("taxii", value.getMetadata().get("source_type")); + Assert.assertEquals("http://localhost:8282/taxii-data", value.getMetadata().get("taxii_url")); } - Assert.assertTrue(maliciousDomains.contains("www.office-112.com")); - Assert.assertEquals(numStringsMatch(MockTaxiiService.pollMsg, "DomainNameObj:Value condition=\"Equals\""), maliciousDomains.size()); - Set maliciousAddresses; { - MockHTable table = (MockHTable) provider.getTable(config, "threat_intel"); - maliciousAddresses= getIndicators("address:IPV_4_ADDR", table.getPutLog(), "cf"); + EnrichmentKey key = new EnrichmentKey("domainname:FQDN", "www.office-112.com"); + Assert.assertTrue(results.containsKey(key)); + EnrichmentValue value = results.get(key); + Assert.assertEquals(6, value.getMetadata().size()); + Assert.assertEquals("guest.Abuse_ch", value.getMetadata().get("taxii_collection")); + Assert.assertEquals("STIX", value.getMetadata().get("source-type")); + Assert.assertEquals("domainname:FQDN", value.getMetadata().get("indicator-type")); + Assert.assertEquals("taxii", value.getMetadata().get("source_type")); + Assert.assertEquals("http://localhost:8282/taxii-data", value.getMetadata().get("taxii_url")); } - Assert.assertTrue(maliciousAddresses.contains("94.102.53.142")); - Assert.assertEquals(numStringsMatch(MockTaxiiService.pollMsg, "AddressObj:Address_Value condition=\"Equal\""), maliciousAddresses.size()); - MockHBaseTableProvider.clear(); // Ensure that the handler can be run multiple times without connection issues. handler.run(); } - - private static int numStringsMatch(String xmlBundle, String text) { - int cnt = 0; - for(String line : Splitter.on("\n").split(xmlBundle)) { - if(line.contains(text)) { - cnt++; - } - } - return cnt; - } - - private static Set getIndicators(String indicatorType, Iterable puts, String cf) throws IOException { - EnrichmentConverter converter = new EnrichmentConverter(); - Set ret = new HashSet<>(); - for(Put p : puts) { - LookupKV kv = converter.fromPut(p, cf); - if (kv.getKey().type.equals(indicatorType)) { - ret.add(kv.getKey().indicator); - } - } - return ret; - } } diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/pom.xml b/metron-platform/metron-enrichment/metron-enrichment-common/pom.xml index 77ec1af086..054c3fe23b 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/pom.xml +++ b/metron-platform/metron-enrichment/metron-enrichment-common/pom.xml @@ -42,6 +42,11 @@ stellar-common ${project.parent.version} + + org.apache.metron + metron-hbase-common + ${project.parent.version} + @@ -110,6 +115,37 @@ + + org.apache.metron + metron-hbase-common + ${project.parent.version} + test + test-jar + + + org.apache.metron + metron-integration-test + ${project.parent.version} + test + + + servlet-api + javax.servlet + + + org.apache.hadoop + hadoop-common + + + org.hamcrest + hamcrest-core + + + org.slf4j + slf4j-log4j12 + + + diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java index b5bc20b525..64776af323 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java @@ -18,31 +18,33 @@ package org.apache.metron.enrichment.adapters.cif; -import java.io.IOException; -import java.io.Serializable; -import java.lang.invoke.MethodHandles; -import java.util.HashMap; -import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.metron.enrichment.cache.CacheKey; import org.apache.metron.enrichment.interfaces.EnrichmentAdapter; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.Map; + @SuppressWarnings("unchecked") public class CIFHbaseAdapter implements EnrichmentAdapter,Serializable { private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final long serialVersionUID = 1L; private String _tableName; - private HTableInterface table; + private Table table; private String _quorum; private String _port; @@ -70,7 +72,6 @@ public JSONObject enrich(CacheKey k) { @SuppressWarnings({ "rawtypes", "deprecation" }) protected Map getCIFObject(String key) { - LOGGER.debug("=======Pinging HBase For: {}", key); Get get = new Get(key.getBytes()); @@ -79,13 +80,11 @@ protected Map getCIFObject(String key) { try { rs = table.get(get); - - for (KeyValue kv : rs.raw()) - output.put(new String(kv.getQualifier()), "Y"); - + for (Cell cell: rs.rawCells()) { + output.put(new String(cell.getQualifierArray()), "Y"); + } } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + LOGGER.error("Unexpected exception", e); } return output; } @@ -102,8 +101,8 @@ public boolean initializeAdapter(Map config) { try { LOGGER.debug("=======Connecting to HBASE==========="); LOGGER.debug("=======ZOOKEEPER = {}", conf.get("hbase.zookeeper.quorum")); - HConnection connection = HConnectionManager.createConnection(conf); - table = connection.getTable(_tableName); + Connection connection = ConnectionFactory.createConnection(conf); + table = connection.getTable(TableName.valueOf(_tableName)); return true; } catch (IOException e) { LOGGER.debug("=======Unable to Connect to HBASE==========="); diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java index c84c494ffe..a641c02f63 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java @@ -20,18 +20,16 @@ import com.google.common.collect.Iterables; -import java.io.IOException; -import java.io.Serializable; -import java.lang.invoke.MethodHandles; -import java.util.List; -import java.util.Map; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; import org.apache.metron.enrichment.cache.CacheKey; import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.converter.EnrichmentValue; import org.apache.metron.enrichment.interfaces.EnrichmentAdapter; import org.apache.metron.enrichment.lookup.EnrichmentLookup; +import org.apache.metron.enrichment.lookup.EnrichmentLookupFactory; import org.apache.metron.enrichment.lookup.LookupKV; import org.apache.metron.enrichment.lookup.accesstracker.NoopAccessTracker; import org.apache.metron.enrichment.utils.EnrichmentUtils; @@ -39,6 +37,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.Map; + public class SimpleHBaseAdapter implements EnrichmentAdapter,Serializable { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); protected SimpleHBaseConfig config; @@ -61,7 +65,7 @@ public void logAccess(CacheKey value) { public boolean isInitialized() { - return lookup != null && lookup.getTable() != null; + return lookup != null; } @Override public JSONObject enrich(CacheKey value) { @@ -105,17 +109,23 @@ public JSONObject enrich(CacheKey value) { @Override public boolean initializeAdapter(Map configuration) { - String hbaseTable = config.getHBaseTable(); - Configuration hbaseConfig = HBaseConfiguration.create(); + if(config == null) { + LOG.error("Unable to initialize adapter. No configuration provided"); + return false; + } + try { - lookup = new EnrichmentLookup( config.getProvider().getTable(hbaseConfig, hbaseTable) - , config.getHBaseCF() - , new NoopAccessTracker() - ); + EnrichmentLookupFactory factory = config.getEnrichmentLookupFactory(); + lookup = factory.create(config.getConnectionFactory(), + HBaseConfiguration.create(), + config.getHBaseTable(), + config.getHBaseCF(), + new NoopAccessTracker()); } catch (IOException e) { LOG.error("Unable to initialize adapter: {}", e.getMessage(), e); return false; } + return true; } diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfig.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfig.java index fefe008863..ec10827548 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfig.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfig.java @@ -17,39 +17,61 @@ */ package org.apache.metron.enrichment.adapters.simplehbase; -import org.apache.metron.enrichment.utils.EnrichmentUtils; -import org.apache.metron.hbase.HTableProvider; -import org.apache.metron.hbase.TableProvider; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.metron.enrichment.lookup.EnrichmentLookupFactories; +import org.apache.metron.enrichment.lookup.EnrichmentLookupFactory; +import org.apache.metron.hbase.client.HBaseConnectionFactory; import java.io.Serializable; - +/** + * Configures the {@link SimpleHBaseAdapter}. + */ public class SimpleHBaseConfig implements Serializable { private String hBaseTable; private String hBaseCF; - private TableProvider provider = new HTableProvider(); + private HBaseConnectionFactory connectionFactory = new HBaseConnectionFactory(); + private EnrichmentLookupFactory enrichmentLookupFactory = EnrichmentLookupFactories.HBASE; + public String getHBaseTable() { return hBaseTable; } + + public SimpleHBaseConfig withHBaseTable(String hBaseTable) { + this.hBaseTable = hBaseTable; + return this; + } + public String getHBaseCF() { return hBaseCF; } - public TableProvider getProvider() { - return provider; + public SimpleHBaseConfig withHBaseCF(String cf) { + this.hBaseCF= cf; + return this; + } + + public HBaseConnectionFactory getConnectionFactory() { + return connectionFactory; } - public SimpleHBaseConfig withProviderImpl(String connectorImpl) { - provider = EnrichmentUtils.getTableProvider(connectorImpl, new HTableProvider()); + public SimpleHBaseConfig withConnectionFactoryImpl(String connectorImpl) { + connectionFactory = HBaseConnectionFactory.byName(connectorImpl); return this; } - public SimpleHBaseConfig withHBaseTable(String hBaseTable) { - this.hBaseTable = hBaseTable; + + public EnrichmentLookupFactory getEnrichmentLookupFactory() { + return enrichmentLookupFactory; + } + + public SimpleHBaseConfig withEnrichmentLookupFactory(EnrichmentLookupFactory enrichmentLookupFactory) { + this.enrichmentLookupFactory = enrichmentLookupFactory; return this; } - public SimpleHBaseConfig withHBaseCF(String cf) { - this.hBaseCF= cf; + public SimpleHBaseConfig withEnrichmentLookupFactory(String clazzName) { + this.enrichmentLookupFactory = EnrichmentLookupFactories.byName(clazzName); return this; } } diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java index 8515c681d7..294e3bf227 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java @@ -30,9 +30,11 @@ import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.interfaces.EnrichmentAdapter; import org.apache.metron.enrichment.lookup.EnrichmentLookup; +import org.apache.metron.enrichment.lookup.EnrichmentLookupFactory; import org.apache.metron.enrichment.lookup.accesstracker.BloomAccessTracker; import org.apache.metron.enrichment.lookup.accesstracker.PersistentAccessTracker; import org.apache.metron.enrichment.utils.EnrichmentUtils; +import org.apache.metron.hbase.client.HBaseConnectionFactory; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,7 +107,7 @@ public JSONObject enrich(CacheKey value) { } public boolean isInitialized() { - return lookup != null && lookup.getTable() != null; + return lookup != null; } @Override @@ -119,15 +121,19 @@ public boolean initializeAdapter(Map configuration) { long millisecondsBetweenPersist = config.getMillisecondsBetweenPersists(); BloomAccessTracker bat = new BloomAccessTracker(hbaseTable, expectedInsertions, falsePositives); Configuration hbaseConfig = HBaseConfiguration.create(); + HBaseConnectionFactory connectionFactory = config.getConnectionFactory(); + EnrichmentLookupFactory lookupFactory = config.getEnrichmentLookupFactory(); try { accessTracker = new PersistentAccessTracker( hbaseTable , UUID.randomUUID().toString() - , config.getProvider().getTable(hbaseConfig, trackerHBaseTable) + , trackerHBaseTable , trackerHBaseCF , bat , millisecondsBetweenPersist - ); - lookup = new EnrichmentLookup(config.getProvider().getTable(hbaseConfig, hbaseTable), config.getHBaseCF(), accessTracker); + , connectionFactory + , hbaseConfig); + + lookup = lookupFactory.create(connectionFactory, hbaseConfig, hbaseTable, config.getHBaseCF(), accessTracker); } catch (IOException e) { LOG.error("Unable to initialize ThreatIntelAdapter", e); return false; diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelConfig.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelConfig.java index 2d63eab2c7..e1c7458468 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelConfig.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelConfig.java @@ -17,9 +17,9 @@ */ package org.apache.metron.enrichment.adapters.threatintel; -import org.apache.metron.enrichment.utils.EnrichmentUtils; -import org.apache.metron.hbase.HTableProvider; -import org.apache.metron.hbase.TableProvider; +import org.apache.metron.enrichment.lookup.EnrichmentLookupFactory; +import org.apache.metron.enrichment.lookup.EnrichmentLookupFactories; +import org.apache.metron.hbase.client.HBaseConnectionFactory; import java.io.Serializable; @@ -32,7 +32,8 @@ public class ThreatIntelConfig implements Serializable { private String trackerHBaseTable; private String trackerHBaseCF; private long millisecondsBetweenPersists = 2*MS_IN_HOUR; - private TableProvider provider = new HTableProvider(); + private HBaseConnectionFactory connectionFactory = new HBaseConnectionFactory(); + private EnrichmentLookupFactory enrichmentLookupFactory = EnrichmentLookupFactories.HBASE; public String getHBaseTable() { return hBaseTable; @@ -62,13 +63,12 @@ public String getHBaseCF() { return hBaseCF; } - public TableProvider getProvider() { - return provider; + public HBaseConnectionFactory getConnectionFactory() { + return connectionFactory; } - public ThreatIntelConfig withProviderImpl(String connectorImpl) { - provider = EnrichmentUtils.getTableProvider(connectorImpl, new HTableProvider()); - return this; + public EnrichmentLookupFactory getEnrichmentLookupFactory() { + return enrichmentLookupFactory; } public ThreatIntelConfig withTrackerHBaseTable(String hBaseTable) { @@ -104,4 +104,19 @@ public ThreatIntelConfig withMillisecondsBetweenPersists(long millisecondsBetwee this.millisecondsBetweenPersists = millisecondsBetweenPersists; return this; } + + public ThreatIntelConfig withConnectionFactory(HBaseConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + return this; + } + + public ThreatIntelConfig withEnrichmentLookupFactory(EnrichmentLookupFactory enrichmentLookupFactory) { + this.enrichmentLookupFactory = enrichmentLookupFactory; + return this; + } + + public ThreatIntelConfig withEnrichmentLookupFactory(String clazzName) { + this.enrichmentLookupFactory = EnrichmentLookupFactories.byName(clazzName); + return this; + } } diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/AbstractConverter.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/AbstractConverter.java index 4b576772b6..86abbdac19 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/AbstractConverter.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/AbstractConverter.java @@ -31,7 +31,12 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.*; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; public abstract class AbstractConverter implements HbaseConverter { @@ -40,19 +45,34 @@ public abstract class AbstractConverter apply(@Nullable Cell cell) { - return new AbstractMap.SimpleEntry<>(cell.getQualifier(), cell.getValue()); + return new AbstractMap.SimpleEntry<>(getQualifier(cell), getValue(cell)); } }; + @Override public Put toPut(String columnFamily, KEY_T key, VALUE_T values) throws IOException { Put put = new Put(key.toBytes()); byte[] cf = Bytes.toBytes(columnFamily); for(Map.Entry kv : values.toColumns()) { - put.add(cf, kv.getKey(), kv.getValue()); + put.addColumn(cf, kv.getKey(), kv.getValue()); } return put; } + private static byte[] getQualifier(Cell cell) { + int length = cell.getQualifierLength(); + int offset = cell.getQualifierOffset(); + byte[] bytes = Arrays.copyOfRange(cell.getRowArray(), offset, offset + length); + return bytes; + } + + private static byte[] getValue(Cell cell) { + int length = cell.getValueLength(); + int offset = cell.getValueOffset(); + byte[] bytes = Arrays.copyOfRange(cell.getRowArray(), offset, offset + length); + return bytes; + } + public LookupKV fromPut(Put put, String columnFamily, KEY_T key, VALUE_T value) throws IOException { key.fromBytes(put.getRow()); byte[] cf = Bytes.toBytes(columnFamily); diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/EnrichmentValue.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/EnrichmentValue.java index bd019a0f3a..6840bc79bf 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/EnrichmentValue.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/EnrichmentValue.java @@ -23,6 +23,7 @@ import org.codehaus.jackson.type.TypeReference; import java.io.IOException; +import java.util.HashMap; import java.util.Map; public class EnrichmentValue implements LookupValue { @@ -35,18 +36,20 @@ protected ObjectMapper initialValue() { public static final String VALUE_COLUMN_NAME = "v"; public static final byte[] VALUE_COLUMN_NAME_B = Bytes.toBytes(VALUE_COLUMN_NAME); - private Map metadata = null; - - public EnrichmentValue() - { + private Map metadata; + public EnrichmentValue() { + metadata = new HashMap<>(); } public EnrichmentValue(Map metadata) { this.metadata = metadata; } - + public EnrichmentValue withValue(String key, Object value) { + metadata.put(key, value); + return this; + } public Map getMetadata() { return metadata; @@ -66,6 +69,14 @@ public void fromColumns(Iterable> values) { } } } + + public void fromColumn(byte[] columnQualifier, byte[] value) { + String columnValue = Bytes.toString(value); + if(Bytes.equals(columnQualifier, VALUE_COLUMN_NAME_B)) { + metadata = stringToValue(columnValue); + } + } + public Map stringToValue(String s){ try { return _mapper.get().readValue(s, new TypeReference>(){}); diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/EnrichmentLookup.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/EnrichmentLookup.java index 3ee9754fca..8d563f65d9 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/EnrichmentLookup.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/EnrichmentLookup.java @@ -19,8 +19,8 @@ import com.google.common.collect.Iterables; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.metron.enrichment.converter.HbaseConverter; import org.apache.metron.enrichment.converter.EnrichmentConverter; import org.apache.metron.enrichment.converter.EnrichmentKey; @@ -38,14 +38,14 @@ public class EnrichmentLookup extends Lookup> implements AutoCloseable { public static class HBaseContext { - private HTableInterface table; + private Table table; private String columnFamily; - public HBaseContext(HTableInterface table, String columnFamily) { + public HBaseContext(Table table, String columnFamily) { this.table = table; this.columnFamily = columnFamily; } - public HTableInterface getTable() { return table; } + public Table getTable() { return table; } public String getColumnFamily() { return columnFamily; } } @@ -84,7 +84,7 @@ public Iterable exists(Iterable> get( Iterable> ret = new ArrayList<>(); Iterator> keyWithContextIterator = keys.iterator(); for(Result result : table.get(keysToGets(keys))) { @@ -115,14 +115,22 @@ public void close() throws Exception { } } - private HTableInterface table; - public EnrichmentLookup(HTableInterface table, String columnFamily, AccessTracker tracker) { + private Table table; + public EnrichmentLookup(Table table, String columnFamily, AccessTracker tracker) { this.table = table; this.setLookupHandler(new Handler(columnFamily)); this.setAccessTracker(tracker); } - public HTableInterface getTable() { + protected EnrichmentLookup() { + /* + * A default constructor is required to allow the FakeEnrichmentLookup + * to be serialized by Storm during the integration tests. This is because + * FakeEnrichmentLookup inherits from EnrichmentLookup. + */ + } + + public Table getTable() { return table; } @@ -131,4 +139,4 @@ public void close() throws Exception { super.close(); table.close(); } -} +} \ No newline at end of file diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/EnrichmentLookupFactories.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/EnrichmentLookupFactories.java new file mode 100644 index 0000000000..e90df68eb8 --- /dev/null +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/EnrichmentLookupFactories.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.lookup; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; +import org.apache.metron.enrichment.lookup.accesstracker.AccessTracker; +import org.apache.metron.hbase.client.HBaseConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.InvocationTargetException; + +/** + * Enumerates the available {@link EnrichmentLookupFactory} implementations. + */ +public enum EnrichmentLookupFactories implements EnrichmentLookupFactory { + + HBASE((connFactory, conf, tableName, columnFamily, accessTracker) -> { + Connection connection = connFactory.createConnection(conf); + Table table = connection.getTable(TableName.valueOf(tableName)); + return new EnrichmentLookup(table, columnFamily, accessTracker); + }); + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private EnrichmentLookupFactory factory; + + EnrichmentLookupFactories(EnrichmentLookupFactory factory) { + this.factory = factory; + } + + @Override + public EnrichmentLookup create(HBaseConnectionFactory connectionFactory, + Configuration configuration, + String tableName, + String columnFamily, + AccessTracker accessTracker) throws IOException { + return factory.create(connectionFactory, configuration, tableName, columnFamily, accessTracker); + } + + /** + * Creates an {@link EnrichmentLookupFactory}. + * + * @param name Either an enum value or a fully-qualified class name. + * @return A {@link EnrichmentLookupFactory}. + */ + public static EnrichmentLookupFactory byName(String name) { + // is this an enum? + try { + return EnrichmentLookupFactories.valueOf(name); + } catch (IllegalArgumentException e) { + LOG.debug("Cannot find EnrichmentLookupFactory by enum name, assuming this is a class name; name={}", name); + } + + // is this a class name? a class name may be used during testing + try { + Class clazz = (Class) Class.forName(name); + return clazz.getConstructor().newInstance(); + } catch (InstantiationException | NoSuchMethodException | IllegalAccessException | ClassNotFoundException | InvocationTargetException e) { + throw new IllegalStateException("Unable to instantiate EnrichmentLookupFactory.", e); + } + } +} diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/EnrichmentHelper.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/EnrichmentLookupFactory.java similarity index 55% rename from metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/EnrichmentHelper.java rename to metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/EnrichmentLookupFactory.java index 475ee8c6e6..50ad6cc041 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/converter/EnrichmentHelper.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/EnrichmentLookupFactory.java @@ -15,22 +15,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.metron.enrichment.converter; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.metron.enrichment.lookup.LookupKV; +package org.apache.metron.enrichment.lookup; + +import org.apache.hadoop.conf.Configuration; +import org.apache.metron.enrichment.lookup.accesstracker.AccessTracker; +import org.apache.metron.hbase.client.HBaseConnectionFactory; import java.io.IOException; +import java.io.Serializable; -public enum EnrichmentHelper { - INSTANCE; - EnrichmentConverter converter = new EnrichmentConverter(); +/** + * Responsible for creating an {@link EnrichmentLookup}. + */ +public interface EnrichmentLookupFactory extends Serializable { - public void load(HTableInterface table, String cf, Iterable> results) throws IOException { - for(LookupKV result : results) { - Put put = converter.toPut(cf, result.getKey(), result.getValue()); - table.put(put); - } - } + EnrichmentLookup create(HBaseConnectionFactory connectionFactory, + Configuration configuration, + String tableName, + String columnFamily, + AccessTracker accessTracker) throws IOException; } diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerCreator.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerCreator.java index f4d0c4cd7c..2121e952ab 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerCreator.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerCreator.java @@ -17,11 +17,11 @@ */ package org.apache.metron.enrichment.lookup.accesstracker; -import org.apache.metron.hbase.TableProvider; +import org.apache.metron.hbase.client.HBaseConnectionFactory; import java.io.IOException; import java.util.Map; public interface AccessTrackerCreator { - public AccessTracker create(Map config, TableProvider provider) throws IOException; + AccessTracker create(Map config, HBaseConnectionFactory connectionFactory) throws IOException; } diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerUtil.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerUtil.java index 5d880f2041..204abeab6a 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerUtil.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerUtil.java @@ -19,11 +19,19 @@ import com.google.common.base.Function; import com.google.common.collect.Iterables; -import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import javax.annotation.Nullable; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; public enum AccessTrackerUtil { INSTANCE; @@ -44,13 +52,13 @@ public byte[] serializeTracker(AccessTracker tracker) throws IOException { } - public void persistTracker(HTableInterface accessTrackerTable, String columnFamily, PersistentAccessTracker.AccessTrackerKey key, AccessTracker underlyingTracker) throws IOException { + public void persistTracker(Table accessTrackerTable, String columnFamily, PersistentAccessTracker.AccessTrackerKey key, AccessTracker underlyingTracker) throws IOException { Put put = new Put(key.toRowKey()); - put.add(Bytes.toBytes(columnFamily), COLUMN, serializeTracker(underlyingTracker)); + put.addColumn(Bytes.toBytes(columnFamily), COLUMN, serializeTracker(underlyingTracker)); accessTrackerTable.put(put); } - public Iterable loadAll(HTableInterface accessTrackerTable, final String columnFamily, final String name, final long earliest) throws IOException { + public Iterable loadAll(Table accessTrackerTable, final String columnFamily, final String name, final long earliest) throws IOException { Scan scan = new Scan(PersistentAccessTracker.AccessTrackerKey.getTimestampScanKey(name, earliest)); ResultScanner scanner = accessTrackerTable.getScanner(scan); return Iterables.transform(scanner, new Function() { diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackers.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackers.java index 7dad6ebca4..9d431318c6 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackers.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackers.java @@ -18,7 +18,7 @@ package org.apache.metron.enrichment.lookup.accesstracker; -import org.apache.metron.hbase.TableProvider; +import org.apache.metron.hbase.client.HBaseConnectionFactory; import java.io.IOException; import java.util.Map; @@ -31,9 +31,8 @@ public enum AccessTrackers implements AccessTrackerCreator { this.creator = creator; } - @Override - public AccessTracker create(Map config, TableProvider provider) throws IOException { - return creator.create(config, provider); + public AccessTracker create(Map config, HBaseConnectionFactory connectionFactory) throws IOException { + return creator.create(config, connectionFactory); } } diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentAccessTracker.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentAccessTracker.java index fd0cd61b1c..6020009c77 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentAccessTracker.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentAccessTracker.java @@ -17,6 +17,15 @@ */ package org.apache.metron.enrichment.lookup.accesstracker; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; +import org.apache.metron.enrichment.lookup.LookupKey; +import org.apache.metron.hbase.client.HBaseConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -26,10 +35,6 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.metron.enrichment.lookup.LookupKey; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class PersistentAccessTracker implements AccessTracker { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -102,7 +107,8 @@ public void run() { } final Object sync = new Object(); - HTableInterface accessTrackerTable; + private Connection connection; + private Table accessTrackerTable; String accessTrackerColumnFamily; AccessTracker underlyingTracker; long timestamp = System.currentTimeMillis(); @@ -113,14 +119,15 @@ public void run() { public PersistentAccessTracker( String name , String containerName - , HTableInterface accessTrackerTable + , String trackerTableName , String columnFamily , AccessTracker underlyingTracker , long maxMillisecondsBetweenPersists + , HBaseConnectionFactory connectionFactory + , Configuration configuration ) { this.containerName = containerName; - this.accessTrackerTable = accessTrackerTable; this.name = name; this.accessTrackerColumnFamily = columnFamily; this.underlyingTracker = underlyingTracker; @@ -129,6 +136,16 @@ public PersistentAccessTracker( String name if(maxMillisecondsBetweenPersists > 0) { timer.scheduleAtFixedRate(new Persister(this), maxMillisecondsBetweenPersists, maxMillisecondsBetweenPersists); } + + try { + this.connection = connectionFactory.createConnection(configuration); + this.accessTrackerTable = connection.getTable(TableName.valueOf(trackerTableName)); + + } catch(IOException e) { + LOG.error("Unable to connection to HBase table '{}", name, e); + throw new IllegalStateException(e); + } + } public void persist(boolean force) { @@ -205,6 +222,7 @@ public void cleanup() throws IOException { } underlyingTracker.cleanup(); accessTrackerTable.close(); + connection.close(); } } } diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentBloomTrackerCreator.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentBloomTrackerCreator.java index a7b1b5f6b2..e0e2c4f856 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentBloomTrackerCreator.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentBloomTrackerCreator.java @@ -19,8 +19,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.metron.hbase.client.HBaseConnectionFactory; import org.apache.metron.stellar.common.utils.ConversionUtils; -import org.apache.metron.hbase.TableProvider; import java.io.IOException; import java.util.Map; @@ -80,7 +80,7 @@ public long getMillisecondsBetweenPersists() { } @Override - public AccessTracker create(Map config, TableProvider provider) throws IOException { + public AccessTracker create(Map config, HBaseConnectionFactory connectionFactory) throws IOException { Config patConfig = new Config(config); String hbaseTable = patConfig.getHBaseTable(); int expectedInsertions = patConfig.getExpectedInsertions(); @@ -89,13 +89,15 @@ public AccessTracker create(Map config, TableProvider provider) BloomAccessTracker bat = new BloomAccessTracker(hbaseTable, expectedInsertions, falsePositives); Configuration hbaseConfig = HBaseConfiguration.create(); - AccessTracker ret = new PersistentAccessTracker( hbaseTable - , UUID.randomUUID().toString() - , provider.getTable(hbaseConfig, hbaseTable) - , patConfig.getHBaseCF() - , bat - , millisecondsBetweenPersist - ); + AccessTracker ret = new PersistentAccessTracker(hbaseTable + , UUID.randomUUID().toString() + , hbaseTable + , patConfig.getHBaseCF() + , bat + , millisecondsBetweenPersist + , connectionFactory + , hbaseConfig + ); return ret; } } diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctions.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctions.java index f8cdf362fe..88b79df0d7 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctions.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctions.java @@ -25,16 +25,20 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; + +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.converter.EnrichmentValue; import org.apache.metron.enrichment.lookup.EnrichmentLookup; +import org.apache.metron.enrichment.lookup.EnrichmentLookupFactories; +import org.apache.metron.enrichment.lookup.EnrichmentLookupFactory; import org.apache.metron.enrichment.lookup.LookupKV; import org.apache.metron.enrichment.lookup.accesstracker.AccessTracker; import org.apache.metron.enrichment.lookup.accesstracker.AccessTrackers; -import org.apache.metron.hbase.HTableProvider; -import org.apache.metron.hbase.TableProvider; +import org.apache.metron.hbase.client.HBaseConnectionFactory; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.ParseException; import org.apache.metron.stellar.dsl.Stellar; @@ -45,11 +49,14 @@ public class SimpleHBaseEnrichmentFunctions { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String ACCESS_TRACKER_TYPE_CONF = "accessTracker"; - public static final String TABLE_PROVIDER_TYPE_CONF = "tableProviderImpl"; + public static final String CONNECTION_FACTORY_IMPL_CONF = "connectionFactoryImpl"; private static AccessTracker tracker; - private static TableProvider provider; - + private static HBaseConnectionFactory connectionFactory; + /** + * Serves as a key for the cache of {@link EnrichmentLookup} objects used + * to lookup the enrichment values. + */ private static class Table { String name; String columnFamily; @@ -92,30 +99,33 @@ private static Map getConfig(Context context) { return (Map) context.getCapability(Context.Capabilities.GLOBAL_CONFIG).orElse(new HashMap<>()); } - private static synchronized void initializeTracker(Map config, TableProvider provider) throws IOException { + private static synchronized void initializeTracker(Map config, HBaseConnectionFactory connectionFactory) throws IOException { if(tracker == null) { String accessTrackerType = (String) config.getOrDefault(ACCESS_TRACKER_TYPE_CONF, AccessTrackers.NOOP.toString()); AccessTrackers trackers = AccessTrackers.valueOf(accessTrackerType); - tracker = trackers.create(config, provider); + tracker = trackers.create(config, connectionFactory); } } - private static TableProvider createProvider(String tableProviderClass) { - try { - Class providerClazz = (Class) Class.forName(tableProviderClass); - return providerClazz.getConstructor().newInstance(); - } catch (Exception e) { - return new HTableProvider(); + private static synchronized void initializeConnectionFactory(Map config) { + if(connectionFactory == null) { + String connectionFactoryImpl = (String) config.getOrDefault(CONNECTION_FACTORY_IMPL_CONF, HBaseConnectionFactory.class.getName()); + connectionFactory = HBaseConnectionFactory.byName(connectionFactoryImpl); } } - private static synchronized void initializeProvider( Map config) { - if(provider != null) { - return ; - } - else { - String tableProviderClass = (String) config.getOrDefault(TABLE_PROVIDER_TYPE_CONF, HTableProvider.class.getName()); - provider = createProvider(tableProviderClass); + /** + * Closes the {@link EnrichmentLookup} after it has been removed from the cache. This + * ensures that the underlying resources are cleaned up. + */ + private static class CloseEnrichmentLookup implements RemovalListener { + @Override + public void onRemoval(RemovalNotification notification) { + try { + notification.getValue().close(); + } catch(Throwable e) { + LOG.error("Failed to close EnrichmentLookup; cause={}", notification.getCause(), e); + } } } @@ -133,8 +143,21 @@ private static synchronized void initializeProvider( Map config) ) public static class EnrichmentExists implements StellarFunction { boolean initialized = false; + private EnrichmentLookupFactory factory; + private Configuration configuration; private static Cache enrichmentCollateralCache = CacheBuilder.newBuilder() - .build(); + .removalListener(new CloseEnrichmentLookup()) + .build(); + + public EnrichmentExists() { + this(EnrichmentLookupFactories.HBASE, HBaseConfiguration.create()); + } + + public EnrichmentExists(EnrichmentLookupFactory factory, Configuration configuration) { + this.factory = factory; + this.configuration = configuration; + } + @Override public Object apply(List args, Context context) throws ParseException { if(!initialized) { @@ -148,17 +171,15 @@ public Object apply(List args, Context context) throws ParseException { String indicator = (String) args.get(i++); String table = (String) args.get(i++); String cf = (String) args.get(i++); - if(enrichmentType == null || indicator == null) { + if (enrichmentType == null || indicator == null || table == null || cf == null) { return false; } final Table key = new Table(table, cf); EnrichmentLookup lookup = null; try { - lookup = enrichmentCollateralCache.get(key, () -> { - HTableInterface hTable = provider.getTable(HBaseConfiguration.create(), key.name); - return new EnrichmentLookup(hTable, key.columnFamily, tracker); - } - ); + lookup = enrichmentCollateralCache.get(key, () -> + factory.create(connectionFactory, configuration, table, cf, tracker)); + } catch (ExecutionException e) { LOG.error("Unable to retrieve enrichmentLookup: {}", e.getMessage(), e); return false; @@ -176,12 +197,11 @@ public Object apply(List args, Context context) throws ParseException { public void initialize(Context context) { try { Map config = getConfig(context); - initializeProvider(config); - initializeTracker(config, provider); + initializeConnectionFactory(config); + initializeTracker(config, connectionFactory); } catch (IOException e) { - LOG.error("Unable to initialize ENRICHMENT.EXISTS: {}", e.getMessage(), e); - } - finally{ + LOG.error("Unable to initialize ENRICHMENT_EXISTS: {}", e.getMessage(), e); + } finally{ initialized = true; } @@ -192,6 +212,11 @@ public boolean isInitialized() { return initialized; } + @Override + public void close() { + enrichmentCollateralCache.invalidateAll(); + enrichmentCollateralCache.cleanUp(); + } } @@ -210,11 +235,29 @@ public boolean isInitialized() { ) public static class EnrichmentGet implements StellarFunction { boolean initialized = false; - private static Cache enrichmentCollateralCache = CacheBuilder.newBuilder() - .build(); + private EnrichmentLookupFactory factory; + private Configuration configuration; + private static Cache enrichmentCollateralCache = CacheBuilder + .newBuilder() + .removalListener(new CloseEnrichmentLookup()) + .build(); + + /** + * The constructor used during Stellar function resolution. + */ + public EnrichmentGet() { + this(EnrichmentLookupFactories.HBASE, HBaseConfiguration.create()); + } + + public EnrichmentGet(EnrichmentLookupFactory factory, Configuration configuration) { + this.factory = factory; + this.configuration = configuration; + } + @Override public Object apply(List args, Context context) throws ParseException { if(!initialized) { + LOG.debug("ENRICHMENT_GET not initialized"); return false; } if(args.size() != 4) { @@ -231,11 +274,8 @@ public Object apply(List args, Context context) throws ParseException { final Table key = new Table(table, cf); EnrichmentLookup lookup = null; try { - lookup = enrichmentCollateralCache.get(key, () -> { - HTableInterface hTable = provider.getTable(HBaseConfiguration.create(), key.name); - return new EnrichmentLookup(hTable, key.columnFamily, tracker); - } - ); + lookup = enrichmentCollateralCache.get(key, () -> + factory.create(connectionFactory, configuration, table, cf, tracker)); } catch (ExecutionException e) { LOG.error("Unable to retrieve enrichmentLookup: {}", e.getMessage(), e); return new HashMap(); @@ -257,16 +297,23 @@ public Object apply(List args, Context context) throws ParseException { public void initialize(Context context) { try { Map config = getConfig(context); - initializeProvider(config); - initializeTracker(config, provider); + initializeConnectionFactory(config); + initializeTracker(config, connectionFactory); + } catch (IOException e) { - LOG.error("Unable to initialize ENRICHMENT.GET: {}", e.getMessage(), e); - } - finally{ + LOG.error("Unable to initialize ENRICHMENT_GET: {}", e.getMessage(), e); + + } finally { initialized = true; } } + @Override + public void close() { + enrichmentCollateralCache.invalidateAll(); + enrichmentCollateralCache.cleanUp(); + } + @Override public boolean isInitialized() { return initialized; diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java index 9a36a87cb7..1bc1a12f92 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java @@ -21,17 +21,15 @@ import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; -import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.metron.common.configuration.enrichment.EnrichmentConfig; import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.lookup.EnrichmentLookup; import org.apache.metron.enrichment.lookup.handler.KeyWithContext; -import org.apache.metron.hbase.TableProvider; import org.json.simple.JSONObject; public class EnrichmentUtils { @@ -45,8 +43,8 @@ public static String getEnrichmentKey(String enrichmentName, String field) { public static class TypeToKey implements Function> { private final String indicator; private final EnrichmentConfig config; - private final HTableInterface table; - public TypeToKey(String indicator, HTableInterface table, EnrichmentConfig config) { + private final Table table; + public TypeToKey(String indicator, Table table, EnrichmentConfig config) { this.indicator = indicator; this.config = config; this.table = table; @@ -95,28 +93,6 @@ public static String toTopLevelField(String field) { return Iterables.getLast(Splitter.on('.').split(field)); } - public static TableProvider getTableProvider(String connectorImpl, TableProvider defaultImpl) { - if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') { - return defaultImpl; - } - else { - try { - Class clazz = (Class) Class.forName(connectorImpl); - return clazz.getConstructor().newInstance(); - } catch (InstantiationException e) { - throw new IllegalStateException("Unable to instantiate connector.", e); - } catch (IllegalAccessException e) { - throw new IllegalStateException("Unable to instantiate connector: illegal access", e); - } catch (InvocationTargetException e) { - throw new IllegalStateException("Unable to instantiate connector", e); - } catch (NoSuchMethodException e) { - throw new IllegalStateException("Unable to instantiate connector: no such method", e); - } catch (ClassNotFoundException e) { - throw new IllegalStateException("Unable to instantiate connector: class not found", e); - } - } - } - public static JSONObject adjustKeys(JSONObject enrichedMessage, JSONObject enrichedField, String field, String prefix) { if ( !enrichedField.isEmpty()) { for (Object enrichedKey : enrichedField.keySet()) { diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java index ef04370a68..1f19a453f1 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java @@ -25,12 +25,9 @@ import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.converter.EnrichmentValue; import org.apache.metron.enrichment.lookup.EnrichmentLookup; -import org.apache.metron.enrichment.converter.EnrichmentHelper; -import org.apache.metron.hbase.mock.MockHTable; -import org.apache.metron.hbase.mock.MockHBaseTableProvider; -import org.apache.metron.enrichment.lookup.LookupKV; -import org.apache.metron.enrichment.lookup.accesstracker.BloomAccessTracker; -import org.apache.metron.enrichment.lookup.accesstracker.PersistentAccessTracker; +import org.apache.metron.enrichment.lookup.FakeEnrichmentLookup; +import org.apache.metron.enrichment.lookup.FakeEnrichmentLookupFactory; +import org.apache.metron.hbase.client.FakeHBaseConnectionFactory; import org.apache.metron.common.utils.JSONUtils; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -38,17 +35,16 @@ import org.junit.Before; import org.junit.Test; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -public class SimpleHBaseAdapterTest { +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; - private String cf = "cf"; - private String cf1 = "cf1"; - private String atTableName = "tracker"; - private final String hbaseTableName = "enrichments"; - private EnrichmentLookup lookup; +public class SimpleHBaseAdapterTest { + private SimpleHBaseConfig config; + private FakeEnrichmentLookup lookup; private static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification"; private static final String CF1_CLASSIFICATION_TYPE = "cf1"; private static final Map CF1_ENRICHMENT = new HashMap() {{ @@ -101,33 +97,35 @@ public class SimpleHBaseAdapterTest { private String sourceConfigWithCFStr; private JSONObject expectedMessage; + + + @Before public void setup() throws Exception { - final MockHTable trackerTable = (MockHTable) MockHBaseTableProvider.addToCache(atTableName, cf); - final MockHTable hbaseTable = (MockHTable) MockHBaseTableProvider.addToCache(hbaseTableName, cf); - EnrichmentHelper.INSTANCE.load(hbaseTable, cf, new ArrayList>() {{ - add(new LookupKV<>(new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, "10.0.2.3") - , new EnrichmentValue(PLAYFUL_ENRICHMENT) - ) - ); - }}); - EnrichmentHelper.INSTANCE.load(hbaseTable, cf1, new ArrayList>() {{ - add(new LookupKV<>(new EnrichmentKey(CF1_CLASSIFICATION_TYPE, "10.0.2.4") - , new EnrichmentValue(CF1_ENRICHMENT) - ) - ); - }}); - BloomAccessTracker bat = new BloomAccessTracker(hbaseTableName, 100, 0.03); - PersistentAccessTracker pat = new PersistentAccessTracker(hbaseTableName, "0", trackerTable, cf, bat, 0L); - lookup = new EnrichmentLookup(hbaseTable, cf, pat); + // the enrichments are retrieved from memory, rather than HBase for these tests + lookup = new FakeEnrichmentLookup(); + lookup.deleteAll(); + lookup.withEnrichment( + new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, "10.0.2.3"), + new EnrichmentValue(PLAYFUL_ENRICHMENT)); + lookup.withEnrichment( + new EnrichmentKey(CF1_CLASSIFICATION_TYPE, "10.0.2.4"), + new EnrichmentValue(CF1_ENRICHMENT)); + + config = new SimpleHBaseConfig() + .withHBaseTable("enrichment") + .withHBaseCF("cf") + .withEnrichmentLookupFactory(new FakeEnrichmentLookupFactory(lookup)); + JSONParser jsonParser = new JSONParser(); expectedMessage = (JSONObject) jsonParser.parse(expectedMessageString); } @Test public void testEnrich() throws Exception { - SimpleHBaseAdapter sha = new SimpleHBaseAdapter(); + SimpleHBaseAdapter sha = new SimpleHBaseAdapter(config); sha.lookup = lookup; + sha.initializeAdapter(new HashMap<>()); SensorEnrichmentConfig broSc = JSONUtils.INSTANCE.load(sourceConfigStr, SensorEnrichmentConfig.class); JSONObject actualMessage = sha.enrich(new CacheKey("test", "test", broSc)); Assert.assertEquals(actualMessage, new JSONObject()); @@ -138,8 +136,9 @@ public void testEnrich() throws Exception { @Test public void testEnrichNonStringValue() throws Exception { - SimpleHBaseAdapter sha = new SimpleHBaseAdapter(); + SimpleHBaseAdapter sha = new SimpleHBaseAdapter(config); sha.lookup = lookup; + sha.initializeAdapter(new HashMap<>()); SensorEnrichmentConfig broSc = JSONUtils.INSTANCE.load(sourceConfigStr, SensorEnrichmentConfig.class); JSONObject actualMessage = sha.enrich(new CacheKey("test", "test", broSc)); Assert.assertEquals(actualMessage, new JSONObject()); @@ -149,8 +148,9 @@ public void testEnrichNonStringValue() throws Exception { @Test public void testMultiColumnFamilies() throws Exception { - SimpleHBaseAdapter sha = new SimpleHBaseAdapter(); + SimpleHBaseAdapter sha = new SimpleHBaseAdapter(config); sha.lookup = lookup; + sha.initializeAdapter(new HashMap<>()); SensorEnrichmentConfig broSc = JSONUtils.INSTANCE.load(sourceConfigWithCFStr, SensorEnrichmentConfig.class); JSONObject actualMessage = sha.enrich(new CacheKey("test", "test", broSc)); Assert.assertEquals(actualMessage, new JSONObject()); @@ -159,22 +159,25 @@ public void testMultiColumnFamilies() throws Exception { Assert.assertEquals(new JSONObject(ImmutableMap.of("cf1.key", "value")), actualMessage); } - @Test - public void testMultiColumnFamiliesWrongCF() throws Exception { - SimpleHBaseAdapter sha = new SimpleHBaseAdapter(); - sha.lookup = lookup; - SensorEnrichmentConfig broSc = JSONUtils.INSTANCE.load(sourceConfigStr, SensorEnrichmentConfig.class); - JSONObject actualMessage = sha.enrich(new CacheKey("test", "test", broSc)); - Assert.assertEquals(actualMessage, new JSONObject()); - actualMessage = sha.enrich(new CacheKey("ip_dst_addr", "10.0.2.4", broSc)); - Assert.assertNotNull(actualMessage); - Assert.assertEquals(new JSONObject(new HashMap()), actualMessage); - } @Test(expected = Exception.class) public void testInitializeAdapter() { SimpleHBaseConfig config = new SimpleHBaseConfig(); + config.withConnectionFactoryImpl(FakeHBaseConnectionFactory.class.getName()); SimpleHBaseAdapter sha = new SimpleHBaseAdapter(config); sha.initializeAdapter(null); } + @Test + public void testCleanup() throws Exception { + EnrichmentLookup mockLookup = mock(EnrichmentLookup.class); + config.withEnrichmentLookupFactory((v,w,x,y,z) -> mockLookup); + + SimpleHBaseAdapter sha = new SimpleHBaseAdapter(config); + sha.initializeAdapter(new HashMap<>()); + sha.cleanup(); + + // the adapter should close the EnrichmentLookup that it is using to free any resources + verify(mockLookup, times(1)).close(); + } + } diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfigTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfigTest.java index 832a939932..07c90a641e 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfigTest.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfigTest.java @@ -17,26 +17,28 @@ */ package org.apache.metron.enrichment.adapters.simplehbase; -import org.apache.metron.hbase.HTableProvider; -import org.apache.metron.hbase.TableProvider; +import org.apache.metron.enrichment.lookup.FakeEnrichmentLookupFactory; +import org.apache.metron.hbase.client.HBaseConnectionFactory; import org.junit.Assert; import org.junit.Test; public class SimpleHBaseConfigTest { - - - private String cf ="cf"; - private String table = "threatintel"; - private TableProvider provider; + private final String cf ="cf"; + private final String table = "threatintel"; @Test - public void test(){ + public void test() { + FakeEnrichmentLookupFactory factory = new FakeEnrichmentLookupFactory(); + SimpleHBaseConfig shc = new SimpleHBaseConfig(); shc.withHBaseCF(cf); shc.withHBaseTable(table); - provider = new HTableProvider(); + shc.withConnectionFactoryImpl(HBaseConnectionFactory.class.getName()); + shc.withEnrichmentLookupFactory(factory); + Assert.assertEquals(cf, shc.getHBaseCF()); Assert.assertEquals(table, shc.getHBaseTable()); + Assert.assertTrue(shc.getConnectionFactory() instanceof HBaseConnectionFactory); + Assert.assertEquals(factory, shc.getEnrichmentLookupFactory()); } - } diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java index bf51b4c015..5484fa8f08 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java @@ -18,22 +18,15 @@ package org.apache.metron.enrichment.adapters.threatintel; import org.adrianwalker.multilinestring.Multiline; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.log4j.Level; import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.enrichment.cache.CacheKey; -import org.apache.metron.hbase.TableProvider; import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.converter.EnrichmentValue; -import org.apache.metron.enrichment.lookup.EnrichmentLookup; -import org.apache.metron.enrichment.converter.EnrichmentHelper; -import org.apache.metron.hbase.mock.MockHTable; -import org.apache.metron.hbase.mock.MockHBaseTableProvider; -import org.apache.metron.enrichment.lookup.LookupKV; -import org.apache.metron.enrichment.lookup.accesstracker.BloomAccessTracker; -import org.apache.metron.enrichment.lookup.accesstracker.PersistentAccessTracker; -import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.enrichment.lookup.FakeEnrichmentLookup; +import org.apache.metron.enrichment.lookup.FakeEnrichmentLookupFactory; +import org.apache.metron.hbase.client.FakeHBaseConnectionFactory; import org.apache.metron.test.utils.UnitTestHelper; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -41,28 +34,16 @@ import org.junit.Before; import org.junit.Test; -import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; public class ThreatIntelAdapterTest { - public static class ExceptionProvider implements TableProvider { - - public ExceptionProvider() {}; - - @Override - public HTableInterface getTable(Configuration config, String tableName) throws IOException { - throw new IOException(); - } - } - - private String cf = "cf"; - private String atTableName = "tracker"; + private SensorEnrichmentConfig config; private static final String MALICIOUS_IP_TYPE = "malicious_ip"; - private final String threatIntelTableName = "threat_intel"; - private EnrichmentLookup lookup; + private FakeEnrichmentLookup lookup; + private ThreatIntelAdapter threatIntelAdapter; + private ThreatIntelConfig threatIntelConfig; /** { @@ -98,44 +79,77 @@ public HTableInterface getTable(Configuration config, String tableName) throws I @Before public void setup() throws Exception { + // deserialize the enrichment configuration + config = JSONUtils.INSTANCE.load(sourceConfigStr, SensorEnrichmentConfig.class); + + // create an enrichment where the indicator is the IP address + lookup = new FakeEnrichmentLookup() + .withEnrichment( + new EnrichmentKey("10.0.2.3", "10.0.2.3"), + new EnrichmentValue()); + + threatIntelConfig = new ThreatIntelConfig() + .withHBaseTable("enrichment") + .withHBaseCF("cf") + .withTrackerHBaseTable("tracker") + .withTrackerHBaseCF("cf") + .withConnectionFactory(new FakeHBaseConnectionFactory()) + .withEnrichmentLookupFactory(new FakeEnrichmentLookupFactory(lookup)); + + threatIntelAdapter = new ThreatIntelAdapter(threatIntelConfig); + threatIntelAdapter.lookup = lookup; + threatIntelAdapter.initializeAdapter(new HashMap<>()); - final MockHTable trackerTable = (MockHTable) MockHBaseTableProvider.addToCache(atTableName, cf); - final MockHTable threatIntelTable = (MockHTable) MockHBaseTableProvider.addToCache(threatIntelTableName, cf); - EnrichmentHelper.INSTANCE.load(threatIntelTable, cf, new ArrayList>() {{ - add(new LookupKV<>(new EnrichmentKey("10.0.2.3", "10.0.2.3"), new EnrichmentValue(new HashMap<>()))); - }}); - - BloomAccessTracker bat = new BloomAccessTracker(threatIntelTableName, 100, 0.03); - PersistentAccessTracker pat = new PersistentAccessTracker(threatIntelTableName, "0", trackerTable, cf, bat, 0L); - lookup = new EnrichmentLookup(threatIntelTable, cf, pat); JSONParser jsonParser = new JSONParser(); expectedMessage = (JSONObject) jsonParser.parse(expectedMessageString); } - @Test public void testEnrich() throws Exception { - ThreatIntelAdapter tia = new ThreatIntelAdapter(); - tia.lookup = lookup; SensorEnrichmentConfig broSc = JSONUtils.INSTANCE.load(sourceConfigStr, SensorEnrichmentConfig.class); - JSONObject actualMessage = tia.enrich(new CacheKey("ip_dst_addr", "10.0.2.3", broSc)); + JSONObject actualMessage = threatIntelAdapter.enrich(new CacheKey("ip_dst_addr", "10.0.2.3", broSc)); Assert.assertNotNull(actualMessage); Assert.assertEquals(expectedMessage, actualMessage); } @Test public void testEnrichNonString() throws Exception { - ThreatIntelAdapter tia = new ThreatIntelAdapter(); - tia.lookup = lookup; SensorEnrichmentConfig broSc = JSONUtils.INSTANCE.load(sourceConfigStr, SensorEnrichmentConfig.class); - JSONObject actualMessage = tia.enrich(new CacheKey("ip_dst_addr", "10.0.2.3", broSc)); + JSONObject actualMessage = threatIntelAdapter.enrich(new CacheKey("ip_dst_addr", "10.0.2.3", broSc)); Assert.assertNotNull(actualMessage); Assert.assertEquals(expectedMessage, actualMessage); - actualMessage = tia.enrich(new CacheKey("ip_dst_addr", 10L, broSc)); + actualMessage = threatIntelAdapter.enrich(new CacheKey("ip_dst_addr", 10L, broSc)); Assert.assertEquals(actualMessage,new JSONObject()); } + @Test + public void testMiss() { + JSONObject actual = threatIntelAdapter.enrich(new CacheKey("ip_dst_addr", "4.4.4.4", config)); + + // not a known IP in either the enrichment data + JSONObject expected = new JSONObject(); + Assert.assertEquals(expected, actual); + } + + @Test + public void testMissWithNonStringValue() { + JSONObject actual = threatIntelAdapter.enrich(new CacheKey("ip_dst_addr", 10L, config)); + + // not a known IP in either the enrichment data + JSONObject expected = new JSONObject(); + Assert.assertEquals(expected, actual); + } + + @Test + public void testNoEnrichmentDefined() { + JSONObject actual = threatIntelAdapter.enrich(new CacheKey("username", "ada_lovelace", config)); + + // no enrichment defined for the field 'username' + JSONObject expected = new JSONObject(); + Assert.assertEquals(expected, actual); + } + @Test public void testInitializeAdapter() { @@ -155,12 +169,14 @@ public void testInitializeAdapter() { config.withMillisecondsBetweenPersists(millionseconds); config.withTrackerHBaseCF(trackCf); config.withTrackerHBaseTable(trackTable); - config.withProviderImpl(ExceptionProvider.class.getName()); + config.withEnrichmentLookupFactory(new FakeEnrichmentLookupFactory(lookup)); + config.withConnectionFactory(new FakeHBaseConnectionFactory()); + ThreatIntelAdapter tia = new ThreatIntelAdapter(config); UnitTestHelper.setLog4jLevel(ThreatIntelAdapter.class, Level.FATAL); tia.initializeAdapter(null); UnitTestHelper.setLog4jLevel(ThreatIntelAdapter.class, Level.ERROR); - Assert.assertFalse(tia.isInitialized()); + Assert.assertTrue(tia.isInitialized()); } diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/lookup/EnrichmentLookupFactoriesTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/lookup/EnrichmentLookupFactoriesTest.java new file mode 100644 index 0000000000..1b865eb010 --- /dev/null +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/lookup/EnrichmentLookupFactoriesTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.lookup; + +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; + +public class EnrichmentLookupFactoriesTest { + + @Test + public void byEnumName() { + assertNotNull(EnrichmentLookupFactories.byName(EnrichmentLookupFactories.HBASE.name())); + } + + @Test + public void byClassName() { + assertNotNull(EnrichmentLookupFactories.byName(FakeEnrichmentLookupFactory.class.getName())); + } + + @Test(expected=IllegalStateException.class) + public void shouldFailWithInvalidName() { + EnrichmentLookupFactories.byName("this-is-an-invalid-name"); + } +} diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/lookup/FakeEnrichmentLookup.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/lookup/FakeEnrichmentLookup.java new file mode 100644 index 0000000000..e99994d0a3 --- /dev/null +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/lookup/FakeEnrichmentLookup.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.lookup; + +import org.apache.metron.enrichment.converter.EnrichmentKey; +import org.apache.metron.enrichment.converter.EnrichmentValue; +import org.apache.metron.enrichment.lookup.handler.KeyWithContext; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * An {@link EnrichmentLookup} useful for testing. + * + *

Maintains a static, in-memory set of enrichments to mimic the behavior of + * an {@link EnrichmentLookup} that interacts with HBase. + */ +public class FakeEnrichmentLookup extends EnrichmentLookup implements Serializable { + + /** + * The available enrichments. This is static so that all + * instances 'see' the same set of enrichments. + */ + private static Map enrichments = Collections.synchronizedMap(new HashMap<>()); + + public FakeEnrichmentLookup() { + super(null, null, null); + } + + /** + * Add an enrichment. + * @param key The enrichment key. + * @param value The enrichment value. + * @return + */ + public FakeEnrichmentLookup withEnrichment(EnrichmentKey key, EnrichmentValue value) { + this.enrichments.put(key, value); + return this; + } + + /** + * Deletes all enrichments. + */ + public FakeEnrichmentLookup deleteAll() { + enrichments.clear(); + return this; + } + + @Override + public boolean exists(EnrichmentKey key, HBaseContext context, boolean logAccess) { + return enrichments.containsKey(key); + } + + @Override + public Iterable exists(Iterable> keys, boolean logAccess) throws IOException { + List results = new ArrayList<>(); + for(KeyWithContext keyWithContext: keys) { + EnrichmentKey key = keyWithContext.getKey(); + results.add(enrichments.containsKey(key)); + } + return results; + } + + @Override + public LookupKV get(EnrichmentKey key, HBaseContext context, boolean logAccess) { + EnrichmentValue value = enrichments.get(key); + return new LookupKV<>(key, value); + } + + @Override + public Iterable> get(Iterable> keys, boolean logAccess) throws IOException { + List> results = new ArrayList<>(); + for(KeyWithContext keyWithContext: keys) { + EnrichmentKey key = keyWithContext.getKey(); + HBaseContext context = keyWithContext.getContext(); + if(enrichments.containsKey(key)) { + results.add(get(key, context, logAccess)); + } + } + return results; + } + + + + @Override + public void close() throws IOException { + // nothing to do + } +} diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/lookup/FakeEnrichmentLookupFactory.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/lookup/FakeEnrichmentLookupFactory.java new file mode 100644 index 0000000000..6974a50af6 --- /dev/null +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/lookup/FakeEnrichmentLookupFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.lookup; + +import org.apache.hadoop.conf.Configuration; +import org.apache.metron.enrichment.lookup.accesstracker.AccessTracker; +import org.apache.metron.hbase.client.HBaseConnectionFactory; + +/** + * Creates a {@link FakeEnrichmentLookup}. + */ +public class FakeEnrichmentLookupFactory implements EnrichmentLookupFactory { + + private FakeEnrichmentLookup lookup; + + public FakeEnrichmentLookupFactory() { + this(new FakeEnrichmentLookup()); + } + + public FakeEnrichmentLookupFactory(FakeEnrichmentLookup lookup) { + this.lookup = lookup; + } + + @Override + public EnrichmentLookup create(HBaseConnectionFactory connectionFactory, + Configuration configuration, + String tableName, + String columnFamily, + AccessTracker accessTracker) { + return lookup; + } +} diff --git a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java index dbbc7d5dbc..3ec08d764f 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java +++ b/metron-platform/metron-enrichment/metron-enrichment-common/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java @@ -19,59 +19,84 @@ package org.apache.metron.enrichment.stellar; import com.google.common.collect.ImmutableMap; -import org.apache.metron.hbase.mock.MockHTable; -import org.apache.metron.hbase.mock.MockHBaseTableProvider; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.metron.enrichment.lookup.EnrichmentLookupFactory; +import org.apache.metron.enrichment.lookup.FakeEnrichmentLookup; +import org.apache.metron.hbase.client.FakeHBaseConnectionFactory; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.DefaultVariableResolver; import org.apache.metron.stellar.dsl.ParseException; -import org.apache.metron.stellar.dsl.StellarFunctions; import org.apache.metron.stellar.common.StellarProcessor; -import org.apache.metron.enrichment.converter.EnrichmentHelper; import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.converter.EnrichmentValue; -import org.apache.metron.enrichment.lookup.LookupKV; +import org.apache.metron.stellar.dsl.VariableResolver; +import org.apache.metron.stellar.dsl.functions.FunctionalFunctions; +import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver; +import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import static com.google.common.collect.ImmutableMap.of; +import static org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions.EnrichmentExists; +import static org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions.EnrichmentGet; + public class SimpleHBaseEnrichmentFunctionsTest { - private final String hbaseTableName = "enrichments"; private static final String ENRICHMENT_TYPE = "et"; - private String cf = "cf"; private static Context context; - - + private EnrichmentExists existsFunction; + private EnrichmentGet getFunction; @Before public void setup() throws Exception { + // provides the enrichment data to the functions + FakeEnrichmentLookup lookup = new FakeEnrichmentLookup() + .withEnrichment(new EnrichmentKey(ENRICHMENT_TYPE, "indicator0"), new EnrichmentValue(of("key0", "value0"))) + .withEnrichment(new EnrichmentKey(ENRICHMENT_TYPE, "indicator1"), new EnrichmentValue(of("key1", "value1"))) + .withEnrichment(new EnrichmentKey(ENRICHMENT_TYPE, "indicator2"), new EnrichmentValue(of("key2", "value2"))) + .withEnrichment(new EnrichmentKey(ENRICHMENT_TYPE, "indicator3"), new EnrichmentValue(of("key3", "value3"))) + .withEnrichment(new EnrichmentKey(ENRICHMENT_TYPE, "indicator4"), new EnrichmentValue(of("key4", "value4"))); - final MockHTable hbaseTable = (MockHTable) MockHBaseTableProvider.addToCache(hbaseTableName, cf); - EnrichmentHelper.INSTANCE.load(hbaseTable, cf, new ArrayList>() {{ - for(int i = 0;i < 5;++i) { - add(new LookupKV<>(new EnrichmentKey(ENRICHMENT_TYPE, "indicator" + i) - , new EnrichmentValue(ImmutableMap.of("key" + i, "value" + i)) - ) - ); - } - }}); context = new Context.Builder() .with( Context.Capabilities.GLOBAL_CONFIG - , () -> ImmutableMap.of( SimpleHBaseEnrichmentFunctions.TABLE_PROVIDER_TYPE_CONF - , MockHBaseTableProvider.class.getName() + , () -> ImmutableMap.of( SimpleHBaseEnrichmentFunctions.CONNECTION_FACTORY_IMPL_CONF + , FakeHBaseConnectionFactory.class.getName() ) ) .build(); + + EnrichmentLookupFactory factory = (connFact, conf, tableName, colFam, accessTracker) -> lookup; + Configuration configuration = HBaseConfiguration.create(); + + // the ENRICHMENT_EXIST function to test + existsFunction = new EnrichmentExists(factory, configuration); + existsFunction.initialize(context); + + // the ENRICHMENT_GET function to test + getFunction = new EnrichmentGet(factory, configuration); + getFunction.initialize(context); } - public Object run(String rule, Map variables) throws Exception { + + public Object run(String rule, Map variables) { StellarProcessor processor = new StellarProcessor(); Assert.assertTrue(rule + " not valid.", processor.validate(rule, context)); - return processor.parse(rule, new DefaultVariableResolver(x -> variables.get(x),x -> variables.containsKey(x)), StellarFunctions.FUNCTION_RESOLVER(), context); + + VariableResolver variableResolver = new DefaultVariableResolver( + x -> variables.get(x), + x -> variables.containsKey(x)); + FunctionResolver functionResolver = new SimpleFunctionResolver() + .withClass(EnrichmentGet.class) + .withClass(EnrichmentExists.class) + .withClass(FunctionalFunctions.Map.class) + .withInstance(existsFunction) + .withInstance(getFunction); + return processor.parse(rule, variableResolver, functionResolver, context); } @Test diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml b/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml index fe21399b89..7f1175e686 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml +++ b/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml @@ -34,6 +34,19 @@ + + + + com.fasterxml.jackson.core + jackson-databind + ${global_jackson_version} + + + com.fasterxml.jackson.core + jackson-annotations + ${global_jackson_version} + + @@ -202,16 +215,6 @@ caffeine ${global_caffeine_version} - - com.fasterxml.jackson.core - jackson-databind - ${global_jackson_version} - - - com.fasterxml.jackson.core - jackson-annotations - ${global_jackson_version} - org.slf4j slf4j-api diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties index 5338ead22c..f93405e066 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties +++ b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties @@ -41,12 +41,12 @@ enrichment.join.cache.size=100000 threat.intel.join.cache.size=100000 ##### Enrichment ##### -hbase.provider.impl=org.apache.metron.hbase.HTableProvider enrichment.simple.hbase.table=enrichment enrichment.simple.hbase.cf=t enrichment.host.known_hosts=[{"ip":"10.1.128.236", "local":"YES", "type":"webserver", "asset_value" : "important"},\ {"ip":"10.1.128.237", "local":"UNKNOWN", "type":"unknown", "asset_value" : "important"},\ {"ip":"10.60.10.254", "local":"YES", "type":"printer", "asset_value" : "important"}] +enrichment.lookup.factory="HBASE" ##### Threat Intel ##### threat.intel.tracker.table=access_tracker diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties.j2 b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties.j2 index 8c28c49c48..491818d616 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties.j2 +++ b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties.j2 @@ -34,10 +34,10 @@ enrichment.error.topic={{enrichment_error_topic}} threat.intel.error.topic={{threatintel_error_topic}} ##### Enrichment ##### -hbase.provider.impl={{enrichment_hbase_provider_impl}} enrichment.simple.hbase.table={{enrichment_hbase_table}} enrichment.simple.hbase.cf={{enrichment_hbase_cf}} enrichment.host.known_hosts={{enrichment_host_known_hosts}} +enrichment.lookup.factory={{enrichment_lookup_factory}} ##### Threat Intel ##### threat.intel.tracker.table={{threatintel_hbase_table}} diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote.yaml b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote.yaml index 45b05cfdc6..9f46fa6e2d 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote.yaml +++ b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote.yaml @@ -78,15 +78,15 @@ components: - id: "simpleHBaseEnrichmentConfig" className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig" configMethods: - - name: "withProviderImpl" - args: - - "${hbase.provider.impl}" - name: "withHBaseTable" args: - - "${enrichment.simple.hbase.table}" + - "${enrichment.simple.hbase.table}" - name: "withHBaseCF" args: - - "${enrichment.simple.hbase.cf}" + - "${enrichment.simple.hbase.cf}" + - name: "withEnrichmentLookupFactory" + args: + - "${enrichment.lookup.factory}" - id: "simpleHBaseEnrichmentAdapter" className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter" @@ -148,21 +148,21 @@ components: - id: "simpleHBaseThreatIntelConfig" className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig" configMethods: - - name: "withProviderImpl" - args: - - "${hbase.provider.impl}" - name: "withTrackerHBaseTable" args: - - "${threat.intel.tracker.table}" + - "${threat.intel.tracker.table}" - name: "withTrackerHBaseCF" args: - - "${threat.intel.tracker.cf}" + - "${threat.intel.tracker.cf}" - name: "withHBaseTable" args: - - "${threat.intel.simple.hbase.table}" + - "${threat.intel.simple.hbase.table}" - name: "withHBaseCF" args: - - "${threat.intel.simple.hbase.cf}" + - "${threat.intel.simple.hbase.cf}" + - name: "withEnrichmentLookupFactory" + args: + - "${enrichment.lookup.factory}" - id: "simpleHBaseThreatIntelAdapter" className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter" diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java index a138c0d897..105b4a02d1 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java +++ b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java @@ -24,20 +24,21 @@ import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.metron.TestConstants; import org.apache.metron.common.Constants; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.enrichment.adapters.maxmind.asn.GeoLiteAsnDatabase; import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase; -import org.apache.metron.enrichment.converter.EnrichmentHelper; import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.converter.EnrichmentValue; -import org.apache.metron.enrichment.lookup.LookupKV; -import org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator; +import org.apache.metron.enrichment.lookup.EnrichmentLookupFactory; +import org.apache.metron.enrichment.lookup.FakeEnrichmentLookup; +import org.apache.metron.enrichment.lookup.FakeEnrichmentLookupFactory; +import org.apache.metron.enrichment.lookup.accesstracker.AccessTrackers; import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions; import org.apache.metron.enrichment.utils.ThreatIntelUtils; -import org.apache.metron.hbase.mock.MockHBaseTableProvider; -import org.apache.metron.hbase.mock.MockHTable; import org.apache.metron.integration.BaseIntegrationTest; import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.ProcessorResult; @@ -48,6 +49,7 @@ import org.apache.metron.integration.processors.KafkaMessageSet; import org.apache.metron.integration.processors.KafkaProcessor; import org.apache.metron.integration.utils.TestUtils; +import org.apache.metron.stellar.dsl.StellarFunctions; import org.apache.metron.test.utils.UnitTestHelper; import org.json.simple.parser.ParseException; import org.junit.Assert; @@ -67,6 +69,9 @@ import java.util.Set; import java.util.stream.Stream; +import static org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions.EnrichmentExists; +import static org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions.EnrichmentGet; + /** * Integration test for the enrichment topology. */ @@ -88,7 +93,6 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { public static final String DEFAULT_DMACODE= "test dmaCode"; public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE); public static final String cf = "cf"; - public static final String trackerHBaseTableName = "tracker"; public static final String threatIntelTableName = "threat_intel"; public static final String enrichmentsTableName = "enrichments"; @@ -158,7 +162,7 @@ public Properties getTopologyProperties() { setProperty("threatintel_error_topic", ERROR_TOPIC); // enrichment - setProperty("enrichment_hbase_provider_impl", "" + MockHBaseTableProvider.class.getName()); + setProperty("enrichment_lookup_factory", FakeEnrichmentLookupFactory.class.getName()); setProperty("enrichment_hbase_table", enrichmentsTableName); setProperty("enrichment_hbase_cf", cf); setProperty("enrichment_host_known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"}," + @@ -201,10 +205,7 @@ public void test() throws Exception { { File globalConfig = new File(enrichmentConfigPath, "global.json"); Map config = JSONUtils.INSTANCE.load(globalConfig, JSONUtils.MAP_SUPPLIER); - config.put(SimpleHBaseEnrichmentFunctions.TABLE_PROVIDER_TYPE_CONF, MockHBaseTableProvider.class.getName()); - config.put(SimpleHBaseEnrichmentFunctions.ACCESS_TRACKER_TYPE_CONF, "PERSISTENT_BLOOM"); - config.put(PersistentBloomTrackerCreator.Config.PERSISTENT_BLOOM_TABLE, trackerHBaseTableName); - config.put(PersistentBloomTrackerCreator.Config.PERSISTENT_BLOOM_CF, cf); + config.put(SimpleHBaseEnrichmentFunctions.ACCESS_TRACKER_TYPE_CONF, AccessTrackers.NOOP); config.put(GeoLiteCityDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath()); config.put(GeoLiteAsnDatabase.ASN_HDFS_FILE, asnHdfsFile.getAbsolutePath()); globalConfigStr = JSONUtils.INSTANCE.toJSON(config, true); @@ -214,19 +215,21 @@ public void test() throws Exception { .withGlobalConfig(globalConfigStr) .withEnrichmentConfigsPath(enrichmentConfigPath); - //create MockHBaseTables - final MockHTable trackerTable = (MockHTable) MockHBaseTableProvider.addToCache(trackerHBaseTableName, cf); - final MockHTable threatIntelTable = (MockHTable) MockHBaseTableProvider.addToCache(threatIntelTableName, cf); - EnrichmentHelper.INSTANCE.load(threatIntelTable, cf, new ArrayList>() {{ - add(new LookupKV<>(new EnrichmentKey(MALICIOUS_IP_TYPE, "10.0.2.3"), new EnrichmentValue(new HashMap<>()))); - }}); - final MockHTable enrichmentTable = (MockHTable) MockHBaseTableProvider.addToCache(enrichmentsTableName, cf); - EnrichmentHelper.INSTANCE.load(enrichmentTable, cf, new ArrayList>() {{ - add(new LookupKV<>(new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, "10.0.2.3") - , new EnrichmentValue(PLAYFUL_ENRICHMENT) - ) - ); - }}); + // add some enrichments to a set of global, static enrichment values to use during these tests + FakeEnrichmentLookup lookup = new FakeEnrichmentLookup() + .withEnrichment( + new EnrichmentKey(MALICIOUS_IP_TYPE, "10.0.2.3"), + new EnrichmentValue(new HashMap<>())) + .withEnrichment( + new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, "10.0.2.3"), + new EnrichmentValue(PLAYFUL_ENRICHMENT)); + EnrichmentLookupFactory lookupCreator = (v, w, x, y, z) -> lookup; + + // the enrichment stellar functions need to access the same global, static enrichment values + Configuration conf = HBaseConfiguration.create(); + StellarFunctions.FUNCTION_RESOLVER() + .withInstance(new EnrichmentGet(lookupCreator, conf)) + .withInstance(new EnrichmentExists(lookupCreator, conf)); FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() .withTopologyLocation(new File(fluxPath())) @@ -235,7 +238,6 @@ public void test() throws Exception { .withTopologyProperties(topologyProperties) .build(); - //UnitTestHelper.verboseLogging(); ComponentRunner runner = new ComponentRunner.Builder() .withComponent("zk",zkServerComponent) diff --git a/metron-platform/metron-hbase/metron-hbase-common/pom.xml b/metron-platform/metron-hbase/metron-hbase-common/pom.xml index 84b24d3169..553b790fbc 100644 --- a/metron-platform/metron-hbase/metron-hbase-common/pom.xml +++ b/metron-platform/metron-hbase/metron-hbase-common/pom.xml @@ -52,6 +52,10 @@ org.slf4j slf4j-log4j12 + + com.google.guava + guava + @@ -89,6 +93,10 @@ org.slf4j slf4j-log4j12 + + com.google.guava + guava + @@ -97,6 +105,12 @@ tests ${global_hadoop_version} provided + + + com.google.guava + guava + + org.apache.hbase