Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@
)

# Metron HBase configuration
enrichment_hbase_provider_impl = 'org.apache.metron.hbase.HTableProvider'
enrichment_lookup_factory = 'HBASE'
enrichment_hbase_table = status_params.enrichment_hbase_table
enrichment_hbase_cf = status_params.enrichment_hbase_cf
# coprocessor config for enrichment list
Expand Down
7 changes: 7 additions & 0 deletions metron-platform/metron-data-management/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-enrichment-common</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-hbase-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
*/
package org.apache.metron.dataloads.hbase.mr;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.metron.enrichment.lookup.LookupKey;
import org.apache.metron.enrichment.lookup.accesstracker.AccessTracker;
import org.apache.metron.enrichment.lookup.accesstracker.AccessTrackerUtil;
import org.apache.metron.hbase.client.HBaseConnectionFactory;

import java.io.IOException;

Expand All @@ -31,22 +33,32 @@ public class PrunerMapper extends TableMapper<ImmutableBytesWritable, Delete> {
public static final String ACCESS_TRACKER_CF_CONF = "access_tracker_cf";
public static final String TIMESTAMP_CONF = "access_tracker_timestamp";
public static final String ACCESS_TRACKER_NAME_CONF = "access_tracker_name";
AccessTracker tracker;
private AccessTracker tracker;
private HBaseConnectionFactory connectionFactory;

public PrunerMapper() {
this.connectionFactory = new HBaseConnectionFactory();
}

@Override
public void setup(Context context) throws IOException
{
public void setup(Context context) throws IOException {
String atTable = context.getConfiguration().get(ACCESS_TRACKER_TABLE_CONF);
String atCF = context.getConfiguration().get(ACCESS_TRACKER_CF_CONF);
String atName = context.getConfiguration().get(ACCESS_TRACKER_NAME_CONF);
HTable table = new HTable(context.getConfiguration(), atTable);
long timestamp = context.getConfiguration().getLong(TIMESTAMP_CONF, -1);
if(timestamp < 0) {
throw new IllegalStateException("Must specify a timestamp that is positive.");
}
try {
tracker = AccessTrackerUtil.INSTANCE.loadAll(AccessTrackerUtil.INSTANCE.loadAll(table, atCF, atName, timestamp));
} catch (Throwable e) {
throw new IllegalStateException("Unable to load the accesstrackers from the directory", e);

// setup the HBase connection
try(Connection connection = connectionFactory.createConnection(context.getConfiguration());
Table table = connection.getTable(TableName.valueOf(atTable))) {

long timestamp = context.getConfiguration().getLong(TIMESTAMP_CONF, -1);
if(timestamp < 0) {
throw new IllegalStateException("Must specify a timestamp that is positive.");
}
try {
tracker = AccessTrackerUtil.INSTANCE.loadAll(AccessTrackerUtil.INSTANCE.loadAll(table, atCF, atName, timestamp));
} catch (Throwable e) {
throw new IllegalStateException("Unable to load the accesstrackers from the directory", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Table;
import org.apache.metron.dataloads.extractor.Extractor;
import org.apache.metron.enrichment.converter.HbaseConverter;

import java.io.IOException;

public class HBaseExtractorState {
private HTableInterface table;
private Table table;
private Extractor extractor;
private HbaseConverter converter;
private FileSystem fs;
private String cf;

public HBaseExtractorState(HTableInterface table, String cf, Extractor extractor, HbaseConverter converter, Configuration config) {
public HBaseExtractorState(Table table, String cf, Extractor extractor, HbaseConverter converter, Configuration config) {
this.table = table;
this.extractor = extractor;
this.converter = converter;
Expand All @@ -48,7 +48,7 @@ public String getCf() {
return cf;
}

public HTableInterface getTable() {
public Table getTable() {
return table;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.metron.common.utils.cli.CLIOptions;
import org.apache.metron.common.utils.file.ReaderSpliterator;
import org.apache.metron.dataloads.extractor.ExtractorHandler;
import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
import org.apache.metron.common.utils.cli.CLIOptions;
import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
import org.apache.metron.dataloads.nonbulk.flatfile.location.Location;
import org.apache.metron.dataloads.nonbulk.flatfile.location.LocationStrategy;
import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput;
import org.apache.metron.hbase.client.HBaseConnectionFactory;

import java.io.BufferedReader;
import java.io.IOException;
Expand All @@ -40,11 +41,21 @@

public abstract class AbstractLocalImporter<OPTIONS_T extends Enum<OPTIONS_T> & CLIOptions, STATE_T> implements Importer<OPTIONS_T> {

private HBaseConnectionFactory connectionFactory;
private Connection connection;

public AbstractLocalImporter(HBaseConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}

@Override
public void importData( final EnumMap<OPTIONS_T, Optional<Object>> config
, final ExtractorHandler handler
, final Configuration hadoopConfig
) throws IOException, InvalidWriterOutput {

connection = connectionFactory.createConnection(hadoopConfig);

validateState(config, handler);
ThreadLocal<STATE_T> state = createState(config, hadoopConfig, handler);
boolean quiet = isQuiet(config);
Expand All @@ -64,6 +75,17 @@ public void importData( final EnumMap<OPTIONS_T, Optional<Object>> config
}
}

protected Connection getConnection() {
return connection;
}

@Override
public void close() throws IOException {
if(connection != null) {
connection.close();
}
}

protected abstract List<String> getInputs(final EnumMap<OPTIONS_T, Optional<Object>> config);
protected abstract boolean isQuiet(final EnumMap<OPTIONS_T, Optional<Object>> config);
protected abstract int batchSize(final EnumMap<OPTIONS_T, Optional<Object>> config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@
package org.apache.metron.dataloads.nonbulk.flatfile.importer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.metron.dataloads.extractor.ExtractorHandler;
import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput;
import org.apache.metron.enrichment.converter.EnrichmentConverter;

import java.io.Closeable;
import java.io.IOException;
import java.util.EnumMap;
import java.util.List;
import java.util.Optional;

public interface Importer<OPTIONS_T extends Enum<OPTIONS_T>> {
public interface Importer<OPTIONS_T extends Enum<OPTIONS_T>> extends Closeable {
void importData(EnumMap<OPTIONS_T, Optional<Object>> config, ExtractorHandler handler , final Configuration hadoopConfig) throws IOException, InvalidWriterOutput;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,32 @@
package org.apache.metron.dataloads.nonbulk.flatfile.importer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.metron.dataloads.extractor.Extractor;
import org.apache.metron.dataloads.extractor.ExtractorHandler;
import org.apache.metron.dataloads.nonbulk.flatfile.HBaseExtractorState;
import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
import org.apache.metron.enrichment.converter.EnrichmentConverter;
import org.apache.metron.enrichment.converter.HbaseConverter;
import org.apache.metron.enrichment.lookup.LookupKV;
import org.apache.metron.hbase.HTableProvider;
import org.apache.metron.hbase.client.HBaseConnectionFactory;

import java.io.*;
import java.util.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Optional;

public class LocalImporter extends AbstractLocalImporter<LoadOptions, HBaseExtractorState> {

public interface HTableProviderRetriever {
HTableProvider retrieve();
}

HTableProviderRetriever provider;

public LocalImporter(HTableProviderRetriever provider) {
this.provider = provider;
public LocalImporter(HBaseConnectionFactory connectionFactory) {
super(connectionFactory);
}

public LocalImporter() {
this(() -> new HTableProvider());
this(new HBaseConnectionFactory());
}


Expand Down Expand Up @@ -86,8 +84,9 @@ protected ThreadLocal<HBaseExtractorState> createState(EnumMap<LoadOptions, Opti
@Override
protected HBaseExtractorState initialValue() {
try {
String tableName = (String) config.get(LoadOptions.HBASE_TABLE).get();
String cf = (String) config.get(LoadOptions.HBASE_CF).get();
HTableInterface table = provider.retrieve().getTable(hadoopConfig, (String) config.get(LoadOptions.HBASE_TABLE).get());
Table table = getConnection().getTable(TableName.valueOf(tableName));
return new HBaseExtractorState(table, cf, handler.getExtractor(), new EnrichmentConverter(), hadoopConfig);
} catch (IOException e1) {
throw new IllegalStateException("Unable to get table: " + e1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput;
import org.apache.metron.dataloads.nonbulk.flatfile.writer.Writer;
import org.apache.metron.dataloads.nonbulk.flatfile.writer.Writers;
import org.apache.metron.hbase.client.HBaseConnectionFactory;

import java.io.File;
import java.io.FileOutputStream;
Expand All @@ -43,6 +44,7 @@ public class LocalSummarizer extends AbstractLocalImporter<SummarizeOptions, Loc
List<SummarizationState> stateList;

public LocalSummarizer() {
super(new HBaseConnectionFactory());
stateList = Collections.synchronizedList(new ArrayList<>());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,9 @@ public void importData(EnumMap<LoadOptions, Optional<Object>> config
throw new IllegalStateException("Unable to complete job: " + e.getMessage(), e);
}
}

@Override
public void close() {
// nothing to do
}
}
Loading