From a06fd0b918bfd4d363d58ffb5c5c2dba015e7c57 Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Wed, 14 Sep 2016 21:54:50 -0400 Subject: [PATCH 1/3] Refactor pcap cli tests. Fix num reducers --- .../apache/metron/pcap/query/CliConfig.java | 9 +- .../apache/metron/pcap/query/CliParser.java | 15 ++- .../org/apache/metron/pcap/query/PcapCli.java | 14 +-- .../metron/pcap/query/ResultsWriter.java | 4 +- .../apache/metron/pcap/query/PcapCliTest.java | 98 +++++++++++-------- .../org/apache/metron/pcap/mr/PcapJob.java | 4 +- 6 files changed, 85 insertions(+), 59 deletions(-) diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java index f8ab0ac020..b600a3eded 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java @@ -23,22 +23,21 @@ import java.text.SimpleDateFormat; public class CliConfig { - public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap"; - public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp"; private boolean showHelp; private String basePath; private String baseOutputPath; private long startTime; private long endTime; - private int numReducers = 0; + private int numReducers; private DateFormat dateFormat; public CliConfig() { showHelp = false; - basePath = BASE_PATH_DEFAULT; - baseOutputPath = BASE_OUTPUT_PATH_DEFAULT; + basePath = ""; + baseOutputPath = ""; startTime = -1L; endTime = -1L; + numReducers = 0; } public int getNumReducers() { diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java index ea6f8e7df2..b046ac6261 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java @@ -24,6 +24,9 @@ * Provides commmon required fields for the PCAP filter jobs */ public class CliParser { + public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap"; + public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp"; + public static final int NUM_REDUCERS_DEFAULT = 10; private CommandLineParser parser; public CliParser() { @@ -33,10 +36,10 @@ public CliParser() { public Options buildOptions() { Options options = new Options(); options.addOption(newOption("h", "help", false, "Display help")); - options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", CliConfig.BASE_PATH_DEFAULT))); - options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", CliConfig.BASE_OUTPUT_PATH_DEFAULT))); + options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", BASE_PATH_DEFAULT))); + options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", BASE_OUTPUT_PATH_DEFAULT))); options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true)); - options.addOption(newOption("nr", "num_reducers", true, "Number of reducers to use", true)); + options.addOption(newOption("nr", "num_reducers", true, String.format("Number of reducers to use (defaults to %s)", NUM_REDUCERS_DEFAULT))); options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time.")); options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch.")); return options; @@ -61,9 +64,13 @@ public void parse(CommandLine commandLine, CliConfig config) throws java.text.Pa } if (commandLine.hasOption("base_path")) { config.setBasePath(commandLine.getOptionValue("base_path")); + } else { + config.setBasePath(BASE_PATH_DEFAULT); } if (commandLine.hasOption("base_output_path")) { config.setBaseOutputPath(commandLine.getOptionValue("base_output_path")); + } else { + config.setBaseOutputPath(BASE_OUTPUT_PATH_DEFAULT); } if (commandLine.hasOption("start_time")) { try { @@ -83,7 +90,7 @@ public void parse(CommandLine commandLine, CliConfig config) throws java.text.Pa config.setNumReducers(numReducers); } else { - config.setNumReducers(10); + config.setNumReducers(NUM_REDUCERS_DEFAULT); } if (commandLine.hasOption("end_time")) { try { diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java index d96e16650b..790eb442fd 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java @@ -34,7 +34,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.List; public class PcapCli { private static final Logger LOGGER = LoggerFactory.getLogger(PcapCli.class); @@ -59,7 +58,7 @@ public int run(String[] args) { return -1; } String jobType = args[0]; - List results = new ArrayList<>(); + Iterable results = new ArrayList<>(); String[] commandArgs = Arrays.copyOfRange(args, 1, args.length); Configuration hadoopConf = new Configuration(); String[] otherArgs = null; @@ -76,6 +75,7 @@ public int run(String[] args) { config = fixedParser.parse(otherArgs); } catch (ParseException | java.text.ParseException e) { System.err.println(e.getMessage()); + System.err.flush(); fixedParser.printHelp(); return -1; } @@ -148,12 +148,12 @@ public int run(String[] args) { String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ"); String outFileName = String.format("pcap-data-%s.pcap", timestamp); try { - if(results.size() > 0) { +// if(results.size() > 0) { resultsWriter.write(results, outFileName); - } - else { - System.out.println("No results returned."); - } +// } +// else { +// System.out.println("No results returned."); +// } } catch (IOException e) { LOGGER.error("Unable to write file", e); return -1; diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java index ab11770b01..ff2539c6f6 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java @@ -27,14 +27,14 @@ public class ResultsWriter { - public void write(List pcaps, String outPath) throws IOException { + public void write(Iterable pcaps, String outPath) throws IOException { File out = new File(outPath); try (FileOutputStream fos = new FileOutputStream(out)) { fos.write(mergePcaps(pcaps)); } } - public byte[] mergePcaps(List pcaps) throws IOException { + public byte[] mergePcaps(Iterable pcaps) throws IOException { if (pcaps == null) { return new byte[]{}; } diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index 4d6432e172..920db5a322 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -71,13 +71,12 @@ public void runs_fixed_pcap_filter_job_with_default_argument_list() throws Excep "-ip_dst_addr", "192.168.1.2", "-ip_src_port", "8081", "-ip_dst_port", "8082", - "-protocol", "6", - "-num_reducers", "10" + "-protocol", "6" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); - Path base_path = new Path(CliConfig.BASE_PATH_DEFAULT); - Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT); + Path base_path = new Path(CliParser.BASE_PATH_DEFAULT); + Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT); EnumMap query = new EnumMap(Constants.Fields.class) {{ put(Constants.Fields.SRC_ADDR, "192.168.1.1"); put(Constants.Fields.DST_ADDR, "192.168.1.2"); @@ -187,13 +186,12 @@ public void runs_query_pcap_filter_job_with_default_argument_list() throws Excep String[] args = { "query", "-start_time", "500", - "-num_reducers", "10", "-query", "some query string" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); - Path base_path = new Path(CliConfig.BASE_PATH_DEFAULT); - Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT); + Path base_path = new Path(CliParser.BASE_PATH_DEFAULT); + Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT); String query = "some query string"; when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps); @@ -229,54 +227,76 @@ public void runs_query_pcap_filter_job_with_full_argument_list() throws Exceptio Mockito.verify(resultsWriter).write(pcaps, "pcap-data-20160615183527162+0000.pcap"); } + // INVALID OPTION CHECKS + @Test public void invalid_fixed_filter_arg_prints_help() throws Exception { + String[] args = { + "fixed", + "-start_time", "500", + "-end_time", "1000", + "-num_reducers", "10", + "-base_path", "/base/path", + "-base_output_path", "/base/output/path", + "-query", "THIS IS AN ERROR" + }; + assertCliError(args, "Fixed", "Unrecognized option: -query"); + } + + /** + * + * @param args PcapJob args + * @param type Fixed|Query + * @param optMsg Expected error message + */ + public void assertCliError(String[] args, String type, String optMsg) { PrintStream originalOutStream = System.out; + PrintStream originalErrOutStream = System.err; try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); - PrintStream testStream = new PrintStream(new BufferedOutputStream(bos)); - System.setOut(testStream); - String[] args = { - "fixed", - "-start_time", "500", - "-end_time", "1000", - "-num_reducers", "10", - "-base_path", "/base/path", - "-base_output_path", "/base/output/path", - "-query", "THIS IS AN ERROR" - }; + PrintStream outStream = new PrintStream(new BufferedOutputStream(bos)); + System.setOut(outStream); + + ByteArrayOutputStream ebos = new ByteArrayOutputStream(); + PrintStream errOutStream = new PrintStream(new BufferedOutputStream(ebos)); + System.setErr(errOutStream); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); assertThat("Expect errors on run", cli.run(args), equalTo(-1)); - assertThat(bos.toString(), bos.toString().contains("usage: Fixed filter options"), equalTo(true)); + assertThat("Expect missing required option error: " + ebos.toString(), ebos.toString().contains(optMsg), equalTo(true)); + assertThat("Expect usage to be printed: " + bos.toString(), bos.toString().contains("usage: " + type + " filter options"), equalTo(true)); } finally { System.setOut(originalOutStream); + System.setErr(originalErrOutStream); } } @Test public void invalid_query_filter_arg_prints_help() throws Exception { - PrintStream originalOutStream = System.out; - try { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - PrintStream outStream = new PrintStream(new BufferedOutputStream(bos)); - System.setOut(outStream); - String[] args = { - "query", - "-start_time", "500", - "-end_time", "1000", - "-num_reducers", "10", - "-base_path", "/base/path", - "-base_output_path", "/base/output/path", - "-ip_src_addr", "THIS IS AN ERROR" - }; + String[] args = { + "query", + "-start_time", "500", + "-end_time", "1000", + "-num_reducers", "10", + "-base_path", "/base/path", + "-base_output_path", "/base/output/path", + "-ip_src_addr", "THIS IS AN ERROR" + }; + assertCliError(args, "Query", ""); + } - PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); - assertThat("Expect errors on run", cli.run(args), equalTo(-1)); - assertThat(bos.toString(), bos.toString().contains("usage: Query filter options"), equalTo(true)); - } finally { - System.setOut(originalOutStream); - } + @Test + public void missing_start_time_arg_prints_error_and_help() throws Exception { + String[] args = { + "fixed", + "-ip_src_addr", "192.168.1.1", + "-ip_dst_addr", "192.168.1.2", + "-ip_src_port", "8081", + "-ip_dst_port", "8082", + "-protocol", "6", + "-num_reducers", "10" + }; + assertCliError(args, "Fixed", "Missing required option: st"); } } diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index cce40748c2..b2e03fb2d9 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -176,7 +176,7 @@ public Iterable getPaths(FileSystem fs, Path basePath, long begin, long return ret; } - private List readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException { + private Iterable readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException { List ret = new ArrayList<>(); for(RemoteIterator it= fs.listFiles(outputPath, false);it.hasNext();) { Path p = it.next().getPath(); @@ -201,7 +201,7 @@ private List readResults(Path outputPath, Configuration config, FileSyst return ret; } - public List query(Path basePath + public Iterable query(Path basePath , Path baseOutputPath , long beginNS , long endNS From cc06349495315a6e1e7b393cd4871bf3e855bc3b Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Fri, 16 Sep 2016 00:43:43 -0400 Subject: [PATCH 2/3] METRON-257 Enable pcap result pagination from the Pcap CLI --- .../pcapservice/PcapReceiverImplRestEasy.java | 38 +++-- .../PcapReceiverImplRestEasyTest.java | 4 +- .../common/hadoop/SequenceFileIterable.java | 138 ++++++++++++++++++ .../apache/metron/pcap/query/CliConfig.java | 9 ++ .../apache/metron/pcap/query/CliParser.java | 9 ++ .../org/apache/metron/pcap/query/PcapCli.java | 35 +++-- .../metron/pcap/query/ResultsWriter.java | 4 +- .../org/apache/metron/pcap/PcapJobTest.java | 34 ++++- .../PcapTopologyIntegrationTest.java | 48 +++--- .../apache/metron/pcap/query/PcapCliTest.java | 41 ++++-- .../org/apache/metron/pcap/mr/PcapJob.java | 43 +++--- pom.xml | 2 +- 12 files changed, 312 insertions(+), 93 deletions(-) create mode 100644 metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java index 18b5dc92d9..5a2a0aecc5 100644 --- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java +++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java @@ -19,11 +19,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.Logger; import org.apache.metron.common.Constants; +import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.pcap.filter.query.QueryPcapFilter; @@ -120,6 +122,7 @@ public Response getPcapsByIdentifiers( throws IOException { PcapsResponse response = new PcapsResponse(); + SequenceFileIterable results = null; try { if (startTime < 0) { startTime = 0L; @@ -137,7 +140,7 @@ public Response getPcapsByIdentifiers( if(LOGGER.isDebugEnabled()) { LOGGER.debug("Query received: " + query); } - response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath()) + results = getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath()) , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath()) , startTime , endTime @@ -146,13 +149,17 @@ public Response getPcapsByIdentifiers( , CONFIGURATION.get() , FileSystem.get(CONFIGURATION.get()) , new QueryPcapFilter.Configurator() - ) ); + response.setPcaps(results != null ? Lists.newArrayList(results) : null); } catch (Exception e) { LOGGER.error("Exception occurred while fetching Pcaps by identifiers :", e); throw new WebApplicationException("Unable to fetch Pcaps via MR job", e); + } finally { + if (null != results) { + results.cleanup(); + } } // return http status '200 OK' along with the complete pcaps response file, @@ -205,6 +212,7 @@ public Response getPcapsByIdentifiers( final boolean includeReverseTrafficF = includeReverseTraffic; PcapsResponse response = new PcapsResponse(); + SequenceFileIterable results = null; try { if(startTime < 0) { startTime = 0L; @@ -237,22 +245,26 @@ public Response getPcapsByIdentifiers( if(LOGGER.isDebugEnabled()) { LOGGER.debug("Query received: " + Joiner.on(",").join(query.entrySet())); } - response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath()) - , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath()) - , startTime - , endTime - , numReducers - , query - , CONFIGURATION.get() - , FileSystem.get(CONFIGURATION.get()) - , new FixedPcapFilter.Configurator() - ) - ); + results = getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath()) + , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath()) + , startTime + , endTime + , numReducers + , query + , CONFIGURATION.get() + , FileSystem.get(CONFIGURATION.get()) + , new FixedPcapFilter.Configurator() + ); + response.setPcaps(results != null ? Lists.newArrayList(results) : null); } catch (Exception e) { LOGGER.error("Exception occurred while fetching Pcaps by identifiers :", e); throw new WebApplicationException("Unable to fetch Pcaps via MR job", e); + } finally { + if (null != results) { + results.cleanup(); + } } // return http status '200 OK' along with the complete pcaps response file, diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java index 1c1c23609c..dba87cf397 100644 --- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java +++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.metron.common.Constants; +import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.mr.PcapJob; @@ -31,7 +32,6 @@ import java.io.IOException; import java.util.EnumMap; -import java.util.List; public class PcapReceiverImplRestEasyTest { @@ -44,7 +44,7 @@ public static class MockQueryHandler extends PcapJob { PcapFilterConfigurator filterImpl; @Override - public List query( Path basePath + public SequenceFileIterable query(Path basePath , Path baseOutputPath , long beginNS , long endNS diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java new file mode 100644 index 0000000000..177825061c --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.hadoop; + +import com.google.common.collect.Iterators; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import static java.lang.String.format; + +public class SequenceFileIterable implements Iterable { + private static final Logger LOGGER = Logger.getLogger(SequenceFileIterable.class); + private List files; + private Configuration config; + + public SequenceFileIterable(List files, Configuration config) { + this.files = files; + this.config = config; + } + + @Override + public Iterator iterator() { + return Iterators.concat(getIterators(files, config)); + } + + private Iterator[] getIterators(List files, Configuration config) { + return files.stream().map(f -> new SequenceFileIterator(f, config)).toArray(Iterator[]::new); + } + + /** + * Cleans up all files read by this Iterable + * + * @return true if success, false if any files were not deleted + * @throws IOException + */ + public boolean cleanup() throws IOException { + FileSystem fileSystem = FileSystem.get(config); + boolean success = true; + for (Path file : files) { + success &= fileSystem.delete(file, false); + } + return success; + } + + private static class SequenceFileIterator implements Iterator { + private Path path; + private Configuration config; + private SequenceFile.Reader reader; + private LongWritable key = new LongWritable(); + private BytesWritable value = new BytesWritable(); + private byte[] next; + private boolean finished = false; + + public SequenceFileIterator(Path path, Configuration config) { + this.path = path; + this.config = config; + } + + @Override + public boolean hasNext() { + if (!finished && null == reader) { + try { + reader = new SequenceFile.Reader(config, SequenceFile.Reader.file(path)); + LOGGER.debug("Writing file: " + path.toString()); + } catch (IOException e) { + throw new RuntimeException("Failed to get reader", e); + } + } else { + LOGGER.debug(format("finished=%s, reader=%s, next=%s", finished, reader, next)); + } + try { + //ensure hasnext is idempotent + if (!finished) { + if (null == next && reader.next(key, value)) { + next = value.copyBytes(); + } else if (null == next) { + close(); + } + } + } catch (IOException e) { + close(); + throw new RuntimeException("Failed to get next record", e); + } + return (null != next); + } + + private void close() { + LOGGER.debug("Closing file: " + path.toString()); + finished = true; + try { + if (reader != null) { + reader.close(); + reader = null; + } + } catch (IOException e) { + // ah well, we tried... + } + } + + @Override + public byte[] next() { + byte[] ret = null; + if (hasNext()) { + ret = next; + next = null; //don't want same record more than once + } else { + throw new NoSuchElementException("No more records"); + } + return ret; + } + } +} diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java index b600a3eded..294844fe03 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java @@ -29,6 +29,7 @@ public class CliConfig { private long startTime; private long endTime; private int numReducers; + private int numRecordsPerFile; private DateFormat dateFormat; public CliConfig() { @@ -99,4 +100,12 @@ public DateFormat getDateFormat() { public void setNumReducers(int numReducers) { this.numReducers = numReducers; } + + public int getNumRecordsPerFile() { + return numRecordsPerFile; + } + + public void setNumRecordsPerFile(int numRecordsPerFile) { + this.numRecordsPerFile = numRecordsPerFile; + } } diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java index b046ac6261..83e9fcfc7f 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java @@ -27,6 +27,7 @@ public class CliParser { public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap"; public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp"; public static final int NUM_REDUCERS_DEFAULT = 10; + public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000; private CommandLineParser parser; public CliParser() { @@ -40,6 +41,7 @@ public Options buildOptions() { options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", BASE_OUTPUT_PATH_DEFAULT))); options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true)); options.addOption(newOption("nr", "num_reducers", true, String.format("Number of reducers to use (defaults to %s)", NUM_REDUCERS_DEFAULT))); + options.addOption(newOption("rpf", "records_per_file", true, String.format("Number of records to include in each output pcap file (defaults to %s)", NUM_RECORDS_PER_FILE_DEFAULT))); options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time.")); options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch.")); return options; @@ -92,6 +94,13 @@ public void parse(CommandLine commandLine, CliConfig config) throws java.text.Pa else { config.setNumReducers(NUM_REDUCERS_DEFAULT); } + if (commandLine.hasOption("records_per_file")) { + int numRecordsPerFile = Integer.parseInt(commandLine.getOptionValue("records_per_file")); + config.setNumRecordsPerFile(numRecordsPerFile); + } + else { + config.setNumRecordsPerFile(NUM_RECORDS_PER_FILE_DEFAULT); + } if (commandLine.hasOption("end_time")) { try { if (commandLine.hasOption("date_format")) { diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java index 790eb442fd..d2e6807471 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java @@ -17,12 +17,14 @@ */ package org.apache.metron.pcap.query; +import com.google.common.collect.Iterables; import org.apache.commons.cli.ParseException; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.common.system.Clock; import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; @@ -32,8 +34,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; +import java.util.List; public class PcapCli { private static final Logger LOGGER = LoggerFactory.getLogger(PcapCli.class); @@ -58,7 +60,7 @@ public int run(String[] args) { return -1; } String jobType = args[0]; - Iterable results = new ArrayList<>(); + SequenceFileIterable results = null; String[] commandArgs = Arrays.copyOfRange(args, 1, args.length); Configuration hadoopConf = new Configuration(); String[] otherArgs = null; @@ -68,11 +70,13 @@ public int run(String[] args) { LOGGER.error("Failed to configure hadoop with provided options: " + e.getMessage(), e); return -1; } + CliConfig commonConfig = null; if ("fixed".equals(jobType)) { FixedCliParser fixedParser = new FixedCliParser(); FixedCliConfig config = null; try { config = fixedParser.parse(otherArgs); + commonConfig = config; } catch (ParseException | java.text.ParseException e) { System.err.println(e.getMessage()); System.err.flush(); @@ -110,6 +114,7 @@ public int run(String[] args) { QueryCliConfig config = null; try { config = queryParser.parse(otherArgs); + commonConfig = config; } catch (ParseException | java.text.ParseException e) { System.err.println(e.getMessage()); queryParser.printHelp(); @@ -145,18 +150,28 @@ public int run(String[] args) { printBasicHelp(); return -1; } - String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ"); - String outFileName = String.format("pcap-data-%s.pcap", timestamp); try { -// if(results.size() > 0) { - resultsWriter.write(results, outFileName); -// } -// else { -// System.out.println("No results returned."); -// } + Iterable> partitions = Iterables.partition(results, commonConfig.getNumRecordsPerFile()); + if (partitions.iterator().hasNext()) { + for (List data : partitions) { + String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ"); + String outFileName = String.format("pcap-data-%s.pcap", timestamp); + if(data.size() > 0) { + resultsWriter.write(data, outFileName); + } + } + } else { + System.out.println("No results returned."); + } } catch (IOException e) { LOGGER.error("Unable to write file", e); return -1; + } finally { + try { + results.cleanup(); + } catch(IOException e) { + LOGGER.warn("Unable to cleanup files in HDFS", e); + } } return 0; } diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java index ff2539c6f6..ab11770b01 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/ResultsWriter.java @@ -27,14 +27,14 @@ public class ResultsWriter { - public void write(Iterable pcaps, String outPath) throws IOException { + public void write(List pcaps, String outPath) throws IOException { File out = new File(outPath); try (FileOutputStream fos = new FileOutputStream(out)) { fos.write(mergePcaps(pcaps)); } } - public byte[] mergePcaps(Iterable pcaps) throws IOException { + public byte[] mergePcaps(List pcaps) throws IOException { if (pcaps == null) { return new byte[]{}; } diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java index 17c9325a02..81725d8633 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java @@ -19,8 +19,11 @@ package org.apache.metron.pcap; import com.google.common.collect.Iterables; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.pcap.mr.PcapJob; import org.junit.Assert; @@ -30,6 +33,9 @@ import java.util.ArrayList; import java.util.List; +import static java.lang.Long.toUnsignedString; +import static org.hamcrest.CoreMatchers.equalTo; + public class PcapJobTest { @Test @@ -48,6 +54,7 @@ protected Iterable listFiles(FileSystem fs, Path basePath) throws IOExcept Assert.assertTrue(Iterables.isEmpty(paths)); } } + @Test public void test_getPaths_leftEdge() throws Exception { PcapJob job; @@ -63,9 +70,10 @@ protected Iterable listFiles(FileSystem fs, Path basePath) throws IOExcept } }; Iterable paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis())); - Assert.assertEquals(1,Iterables.size(paths)); + Assert.assertEquals(1, Iterables.size(paths)); } } + @Test public void test_getPaths_rightEdge() throws Exception { PcapJob job; @@ -80,8 +88,8 @@ protected Iterable listFiles(FileSystem fs, Path basePath) throws IOExcept return inputFiles; } }; - Iterable paths = job.getPaths(null, null, 1461589333993573000L-1L, 1461589333993573000L + 1L); - Assert.assertEquals(2,Iterables.size(paths)); + Iterable paths = job.getPaths(null, null, 1461589333993573000L - 1L, 1461589333993573000L + 1L); + Assert.assertEquals(2, Iterables.size(paths)); } { final List inputFiles = new ArrayList() {{ @@ -95,10 +103,11 @@ protected Iterable listFiles(FileSystem fs, Path basePath) throws IOExcept return inputFiles; } }; - Iterable paths = job.getPaths(null, null, 1461589334993573000L-1L, 1461589334993573000L + 1L); - Assert.assertEquals(2,Iterables.size(paths)); + Iterable paths = job.getPaths(null, null, 1461589334993573000L - 1L, 1461589334993573000L + 1L); + Assert.assertEquals(2, Iterables.size(paths)); } } + @Test public void test_getPaths_bothEdges() throws Exception { PcapJob job; @@ -115,7 +124,20 @@ protected Iterable listFiles(FileSystem fs, Path basePath) throws IOExcept } }; Iterable paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis())); - Assert.assertEquals(3,Iterables.size(paths)); + Assert.assertEquals(3, Iterables.size(paths)); } } + + @Test + public void partition_gives_value_in_range() throws Exception { + long start = 1473897600000000000L; + long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L); + Configuration conf = new Configuration(); + conf.set(PcapJob.START_TS_CONF, toUnsignedString(start)); + conf.set(PcapJob.END_TS_CONF, toUnsignedString(end)); + conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10)); + PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner(); + partitioner.setConf(conf); + Assert.assertThat("Partition not in range", partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10), equalTo(8)); + } } diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index d4367ea26a..0dd07aa2e6 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -271,7 +271,7 @@ public Void getResult() { PcapJob job = new PcapJob(); { //Ensure that only two pcaps are returned when we look at 4 and 5 - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(4, pcapEntries) @@ -283,12 +283,12 @@ public Void getResult() { , new FixedPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), 2); + Assert.assertEquals(Iterables.size(results), 2); } { // Ensure that only two pcaps are returned when we look at 4 and 5 // test with empty query filter - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(4, pcapEntries) @@ -300,11 +300,11 @@ public Void getResult() { , new QueryPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), 2); + Assert.assertEquals(Iterables.size(results), 2); } { //ensure that none get returned since that destination IP address isn't in the dataset - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -318,12 +318,12 @@ public Void getResult() { , new FixedPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), 0); + Assert.assertEquals(Iterables.size(results), 0); } { // ensure that none get returned since that destination IP address isn't in the dataset // test with query filter - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -335,11 +335,11 @@ public Void getResult() { , new QueryPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), 0); + Assert.assertEquals(Iterables.size(results), 0); } { //same with protocol as before with the destination addr - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -353,12 +353,12 @@ public Void getResult() { , new FixedPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), 0); + Assert.assertEquals(Iterables.size(results), 0); } { //same with protocol as before with the destination addr //test with query filter - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -370,11 +370,11 @@ public Void getResult() { , new QueryPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), 0); + Assert.assertEquals(Iterables.size(results), 0); } { //make sure I get them all. - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -386,12 +386,12 @@ public Void getResult() { , new FixedPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), pcapEntries.size()); + Assert.assertEquals(Iterables.size(results), pcapEntries.size()); } { //make sure I get them all. //with query filter - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -403,10 +403,10 @@ public Void getResult() { , new QueryPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertEquals(results.size(), pcapEntries.size()); + Assert.assertEquals(Iterables.size(results), pcapEntries.size()); } { - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -420,8 +420,8 @@ public Void getResult() { , new FixedPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertTrue(results.size() > 0); - Assert.assertEquals(results.size() + Assert.assertTrue(Iterables.size(results) > 0); + Assert.assertEquals(Iterables.size(results) , Iterables.size(filterPcaps(pcapEntries, new Predicate() { @Override public boolean apply(@Nullable JSONObject input) { @@ -432,12 +432,12 @@ public boolean apply(@Nullable JSONObject input) { ) ); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, results); + PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next()); Assert.assertTrue(baos.toByteArray().length > 0); } { //test with query filter - List results = + Iterable results = job.query(new Path(outDir.getAbsolutePath()) , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) @@ -449,8 +449,8 @@ public boolean apply(@Nullable JSONObject input) { , new QueryPcapFilter.Configurator() ); assertInOrder(results); - Assert.assertTrue(results.size() > 0); - Assert.assertEquals(results.size() + Assert.assertTrue(Iterables.size(results) > 0); + Assert.assertEquals(Iterables.size(results) , Iterables.size(filterPcaps(pcapEntries, new Predicate() { @Override public boolean apply(@Nullable JSONObject input) { @@ -462,7 +462,7 @@ public boolean apply(@Nullable JSONObject input) { ); assertInOrder(results); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, results); + PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next()); Assert.assertTrue(baos.toByteArray().length > 0); } System.out.println("Ended"); diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index 920db5a322..bad22e47b8 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.metron.common.Constants; +import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.common.system.Clock; import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; @@ -38,14 +39,12 @@ import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Date; -import java.util.EnumMap; -import java.util.List; +import java.util.*; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.*; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class PcapCliTest { @@ -74,6 +73,9 @@ public void runs_fixed_pcap_filter_job_with_default_argument_list() throws Excep "-protocol", "6" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); + Iterator iterator = pcaps.iterator(); + SequenceFileIterable iterable = mock(SequenceFileIterable.class); + when(iterable.iterator()).thenReturn(iterator); Path base_path = new Path(CliParser.BASE_PATH_DEFAULT); Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT); @@ -86,7 +88,7 @@ public void runs_fixed_pcap_filter_job_with_default_argument_list() throws Excep put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); }}; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); @@ -108,9 +110,13 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list_and_default_datef "-ip_dst_port", "8082", "-protocol", "6", "-include_reverse", - "-num_reducers", "10" + "-num_reducers", "10", + "-records_per_file", "1000" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); + Iterator iterator = pcaps.iterator(); + SequenceFileIterable iterable = mock(SequenceFileIterable.class); + when(iterable.iterator()).thenReturn(iterator); Path base_path = new Path("/base/path"); Path base_output_path = new Path("/base/output/path"); @@ -123,7 +129,7 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list_and_default_datef put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true"); }}; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); @@ -146,9 +152,13 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list() throws Exceptio "-ip_dst_port", "8082", "-protocol", "6", "-include_reverse", - "-num_reducers", "10" + "-num_reducers", "10", + "-records_per_file", "1000" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); + Iterator iterator = pcaps.iterator(); + SequenceFileIterable iterable = mock(SequenceFileIterable.class); + when(iterable.iterator()).thenReturn(iterator); Path base_path = new Path("/base/path"); Path base_output_path = new Path("/base/output/path"); @@ -163,7 +173,7 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list() throws Exceptio long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss"); long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss"); - when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); @@ -189,12 +199,15 @@ public void runs_query_pcap_filter_job_with_default_argument_list() throws Excep "-query", "some query string" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); + Iterator iterator = pcaps.iterator(); + SequenceFileIterable iterable = mock(SequenceFileIterable.class); + when(iterable.iterator()).thenReturn(iterator); Path base_path = new Path(CliParser.BASE_PATH_DEFAULT); Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT); String query = "some query string"; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); @@ -211,15 +224,19 @@ public void runs_query_pcap_filter_job_with_full_argument_list() throws Exceptio "-num_reducers", "10", "-base_path", "/base/path", "-base_output_path", "/base/output/path", - "-query", "some query string" + "-query", "some query string", + "-records_per_file", "1000" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); + Iterator iterator = pcaps.iterator(); + SequenceFileIterable iterable = mock(SequenceFileIterable.class); + when(iterable.iterator()).thenReturn(iterator); Path base_path = new Path("/base/path"); Path base_output_path = new Path("/base/output/path"); String query = "some query string"; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index b2e03fb2d9..f8746208a2 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; @@ -35,6 +34,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.log4j.Logger; +import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.pcap.PacketInfo; import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.filter.PcapFilter; @@ -64,6 +64,11 @@ public int getPartition(LongWritable longWritable, BytesWritable bytesWritable, } long x = longWritable.get(); int ret = (int)Long.divideUnsigned(x - start, width); + if(ret > numPartitions) { + throw new IllegalArgumentException(String.format("Bad partition: key=%s, width=%d, partition=%d, numPartitions=%d" + , Long.toUnsignedString(x), width, ret, numPartitions) + ); + } return ret; } @@ -176,32 +181,26 @@ public Iterable getPaths(FileSystem fs, Path basePath, long begin, long return ret; } - private Iterable readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException { - List ret = new ArrayList<>(); - for(RemoteIterator it= fs.listFiles(outputPath, false);it.hasNext();) { + /** + * Returns a lazily-read Iterable over a set of sequence files + */ + private SequenceFileIterable readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException { + List files = new ArrayList<>(); + for (RemoteIterator it = fs.listFiles(outputPath, false); it.hasNext(); ) { Path p = it.next().getPath(); - if(p.getName().equals("_SUCCESS")) { + if (p.getName().equals("_SUCCESS")) { fs.delete(p, false); continue; } - SequenceFile.Reader reader = new SequenceFile.Reader(config, - SequenceFile.Reader.file(p)); - LongWritable key = new LongWritable(); - BytesWritable value = new BytesWritable(); - while(reader.next(key, value)) { - ret.add(value.copyBytes()); - } - reader.close(); - fs.delete(p, false); + files.add(p); } - fs.delete(outputPath, false); - if(LOG.isDebugEnabled()) { - LOG.debug(outputPath + ": Returning " + ret.size()); + if (LOG.isDebugEnabled()) { + LOG.debug(outputPath); } - return ret; + return new SequenceFileIterable(files, config); } - public Iterable query(Path basePath + public SequenceFileIterable query(Path basePath , Path baseOutputPath , long beginNS , long endNS @@ -240,12 +239,10 @@ public Iterable query(Path basePath } } - - public static int findWidth(long start, long end, int numReducers) { - return (int)Long.divideUnsigned(end - start, numReducers) + 1; + public static long findWidth(long start, long end, int numReducers) { + return Long.divideUnsigned(end - start, numReducers) + 1; } - public Job createJob( Path basePath , Path outputPath , long beginNS diff --git a/pom.xml b/pom.xml index d7e373d26b..659a467d53 100644 --- a/pom.xml +++ b/pom.xml @@ -202,7 +202,7 @@ metron-ui/lib/public/font/** metron-ui/node_modules/** - metron-deployment/packaging/ambari/src/main/resources/common-services/KIBANA/4.5.1/package/scripts/dashboard/*.p + **/packaging/ambari/src/main/resources/common-services/KIBANA/4.5.1/package/scripts/dashboard/*.p From 2227a70965393e9cd329bc4f55e769ee7993c294 Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Fri, 16 Sep 2016 09:24:09 -0400 Subject: [PATCH 3/3] METRON-257 Enable pcap result pagination from the Pcap CLI --- .../org/apache/metron/common/hadoop/SequenceFileIterable.java | 1 + 1 file changed, 1 insertion(+) diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java index 177825061c..a57cd35c1f 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/hadoop/SequenceFileIterable.java @@ -120,6 +120,7 @@ private void close() { } } catch (IOException e) { // ah well, we tried... + LOGGER.warn("Error closing file", e); } }