Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
import org.apache.metron.common.Constants;
import org.apache.metron.common.hadoop.SequenceFileIterable;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
import org.apache.metron.pcap.filter.query.QueryPcapFilter;
Expand Down Expand Up @@ -120,6 +122,7 @@ public Response getPcapsByIdentifiers(

throws IOException {
PcapsResponse response = new PcapsResponse();
SequenceFileIterable results = null;
try {
if (startTime < 0) {
startTime = 0L;
Expand All @@ -137,7 +140,7 @@ public Response getPcapsByIdentifiers(
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("Query received: " + query);
}
response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
results = getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
, new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
, startTime
, endTime
Expand All @@ -146,13 +149,17 @@ public Response getPcapsByIdentifiers(
, CONFIGURATION.get()
, FileSystem.get(CONFIGURATION.get())
, new QueryPcapFilter.Configurator()
)
);

response.setPcaps(results != null ? Lists.newArrayList(results) : null);
} catch (Exception e) {
LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
e);
throw new WebApplicationException("Unable to fetch Pcaps via MR job", e);
} finally {
if (null != results) {
results.cleanup();
}
}

// return http status '200 OK' along with the complete pcaps response file,
Expand Down Expand Up @@ -205,6 +212,7 @@ public Response getPcapsByIdentifiers(

final boolean includeReverseTrafficF = includeReverseTraffic;
PcapsResponse response = new PcapsResponse();
SequenceFileIterable results = null;
try {
if(startTime < 0) {
startTime = 0L;
Expand Down Expand Up @@ -237,22 +245,26 @@ public Response getPcapsByIdentifiers(
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("Query received: " + Joiner.on(",").join(query.entrySet()));
}
response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
, new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
, startTime
, endTime
, numReducers
, query
, CONFIGURATION.get()
, FileSystem.get(CONFIGURATION.get())
, new FixedPcapFilter.Configurator()
)
);
results = getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
, new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
, startTime
, endTime
, numReducers
, query
, CONFIGURATION.get()
, FileSystem.get(CONFIGURATION.get())
, new FixedPcapFilter.Configurator()
);
response.setPcaps(results != null ? Lists.newArrayList(results) : null);

} catch (Exception e) {
LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
e);
throw new WebApplicationException("Unable to fetch Pcaps via MR job", e);
} finally {
if (null != results) {
results.cleanup();
}
}

// return http status '200 OK' along with the complete pcaps response file,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.metron.common.Constants;
import org.apache.metron.common.hadoop.SequenceFileIterable;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
import org.apache.metron.pcap.filter.PcapFilterConfigurator;
import org.apache.metron.pcap.mr.PcapJob;
Expand All @@ -31,7 +32,6 @@

import java.io.IOException;
import java.util.EnumMap;
import java.util.List;

public class PcapReceiverImplRestEasyTest {

Expand All @@ -44,7 +44,7 @@ public static class MockQueryHandler<R> extends PcapJob {
PcapFilterConfigurator<R> filterImpl;

@Override
public <T> List<byte[]> query( Path basePath
public <T> SequenceFileIterable query(Path basePath
, Path baseOutputPath
, long beginNS
, long endNS
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.metron.common.hadoop;

import com.google.common.collect.Iterators;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

import static java.lang.String.format;

public class SequenceFileIterable implements Iterable<byte[]> {
private static final Logger LOGGER = Logger.getLogger(SequenceFileIterable.class);
private List<Path> files;
private Configuration config;

public SequenceFileIterable(List<Path> files, Configuration config) {
this.files = files;
this.config = config;
}

@Override
public Iterator<byte[]> iterator() {
return Iterators.concat(getIterators(files, config));
}

private Iterator<byte[]>[] getIterators(List<Path> files, Configuration config) {
return files.stream().map(f -> new SequenceFileIterator(f, config)).toArray(Iterator[]::new);
}

/**
* Cleans up all files read by this Iterable
*
* @return true if success, false if any files were not deleted
* @throws IOException
*/
public boolean cleanup() throws IOException {
FileSystem fileSystem = FileSystem.get(config);
boolean success = true;
for (Path file : files) {
success &= fileSystem.delete(file, false);
}
return success;
}

private static class SequenceFileIterator implements Iterator<byte[]> {
private Path path;
private Configuration config;
private SequenceFile.Reader reader;
private LongWritable key = new LongWritable();
private BytesWritable value = new BytesWritable();
private byte[] next;
private boolean finished = false;

public SequenceFileIterator(Path path, Configuration config) {
this.path = path;
this.config = config;
}

@Override
public boolean hasNext() {
if (!finished && null == reader) {
try {
reader = new SequenceFile.Reader(config, SequenceFile.Reader.file(path));
LOGGER.debug("Writing file: " + path.toString());
} catch (IOException e) {
throw new RuntimeException("Failed to get reader", e);
}
} else {
LOGGER.debug(format("finished=%s, reader=%s, next=%s", finished, reader, next));
}
try {
//ensure hasnext is idempotent
if (!finished) {
if (null == next && reader.next(key, value)) {
next = value.copyBytes();
} else if (null == next) {
close();
}
}
} catch (IOException e) {
close();
throw new RuntimeException("Failed to get next record", e);
}
return (null != next);
}

private void close() {
LOGGER.debug("Closing file: " + path.toString());
finished = true;
try {
if (reader != null) {
reader.close();
reader = null;
}
} catch (IOException e) {
// ah well, we tried...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log this Exception? I'm not sure we can do anything about it, but it would be nice to be able to ensure we can see it.

LOGGER.warn("Error closing file", e);
}
}

@Override
public byte[] next() {
byte[] ret = null;
if (hasNext()) {
ret = next;
next = null; //don't want same record more than once
} else {
throw new NoSuchElementException("No more records");
}
return ret;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@
import java.text.SimpleDateFormat;

public class CliConfig {
public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap";
public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp";
private boolean showHelp;
private String basePath;
private String baseOutputPath;
private long startTime;
private long endTime;
private int numReducers = 0;
private int numReducers;
private int numRecordsPerFile;
private DateFormat dateFormat;

public CliConfig() {
showHelp = false;
basePath = BASE_PATH_DEFAULT;
baseOutputPath = BASE_OUTPUT_PATH_DEFAULT;
basePath = "";
baseOutputPath = "";
startTime = -1L;
endTime = -1L;
numReducers = 0;
}

public int getNumReducers() {
Expand Down Expand Up @@ -100,4 +100,12 @@ public DateFormat getDateFormat() {
public void setNumReducers(int numReducers) {
this.numReducers = numReducers;
}

public int getNumRecordsPerFile() {
return numRecordsPerFile;
}

public void setNumRecordsPerFile(int numRecordsPerFile) {
this.numRecordsPerFile = numRecordsPerFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
* Provides commmon required fields for the PCAP filter jobs
*/
public class CliParser {
public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap";
public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp";
public static final int NUM_REDUCERS_DEFAULT = 10;
public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000;
private CommandLineParser parser;

public CliParser() {
Expand All @@ -33,10 +37,11 @@ public CliParser() {
public Options buildOptions() {
Options options = new Options();
options.addOption(newOption("h", "help", false, "Display help"));
options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", CliConfig.BASE_PATH_DEFAULT)));
options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", CliConfig.BASE_OUTPUT_PATH_DEFAULT)));
options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", BASE_PATH_DEFAULT)));
options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", BASE_OUTPUT_PATH_DEFAULT)));
options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true));
options.addOption(newOption("nr", "num_reducers", true, "Number of reducers to use", true));
options.addOption(newOption("nr", "num_reducers", true, String.format("Number of reducers to use (defaults to %s)", NUM_REDUCERS_DEFAULT)));
options.addOption(newOption("rpf", "records_per_file", true, String.format("Number of records to include in each output pcap file (defaults to %s)", NUM_RECORDS_PER_FILE_DEFAULT)));
options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time."));
options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch."));
return options;
Expand All @@ -61,9 +66,13 @@ public void parse(CommandLine commandLine, CliConfig config) throws java.text.Pa
}
if (commandLine.hasOption("base_path")) {
config.setBasePath(commandLine.getOptionValue("base_path"));
} else {
config.setBasePath(BASE_PATH_DEFAULT);
}
if (commandLine.hasOption("base_output_path")) {
config.setBaseOutputPath(commandLine.getOptionValue("base_output_path"));
} else {
config.setBaseOutputPath(BASE_OUTPUT_PATH_DEFAULT);
}
if (commandLine.hasOption("start_time")) {
try {
Expand All @@ -83,7 +92,14 @@ public void parse(CommandLine commandLine, CliConfig config) throws java.text.Pa
config.setNumReducers(numReducers);
}
else {
config.setNumReducers(10);
config.setNumReducers(NUM_REDUCERS_DEFAULT);
}
if (commandLine.hasOption("records_per_file")) {
int numRecordsPerFile = Integer.parseInt(commandLine.getOptionValue("records_per_file"));
config.setNumRecordsPerFile(numRecordsPerFile);
}
else {
config.setNumRecordsPerFile(NUM_RECORDS_PER_FILE_DEFAULT);
}
if (commandLine.hasOption("end_time")) {
try {
Expand Down
Loading