From 75708255120f47ba3ab47d3003e4c42f76cc99e8 Mon Sep 17 00:00:00 2001 From: cstella Date: Thu, 18 Aug 2016 20:14:53 -0400 Subject: [PATCH 1/2] METRON-381: Add support for multiple reducers in pcap_query.sh --- metron-platform/metron-pcap-backend/README.md | 4 ++ .../apache/metron/pcap/query/CliConfig.java | 9 ++++ .../apache/metron/pcap/query/CliParser.java | 8 +++ .../org/apache/metron/pcap/query/PcapCli.java | 9 +++- .../PcapTopologyIntegrationTest.java | 10 ++++ .../apache/metron/pcap/query/PcapCliTest.java | 23 +++++--- .../org/apache/metron/pcap/mr/PcapJob.java | 53 +++++++++++++++++-- 7 files changed, 102 insertions(+), 14 deletions(-) diff --git a/metron-platform/metron-pcap-backend/README.md b/metron-platform/metron-pcap-backend/README.md index b708fe5648..927ae40445 100644 --- a/metron-platform/metron-pcap-backend/README.md +++ b/metron-platform/metron-pcap-backend/README.md @@ -96,6 +96,8 @@ usage: Fixed filter options -dp,--ip_dst_port Destination port -et,--end_time Packet end time range. Default is current system time. + -nr,--num_reducers The number of reducers to use. Default + is 10. -h,--help Display help -ir,--include_reverse Indicates if filter should check swapped src/dest addresses and IPs @@ -116,6 +118,8 @@ usage: Query filter options millis since the epoch. -et,--end_time Packet end time range. Default is current system time. + -nr,--num_reducers The number of reducers to use. Default + is 10. -h,--help Display help -q,--query Query string to use as a filter -st,--start_time (required) Packet start time range. diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java index a0271b8c9c..f8ab0ac020 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java @@ -30,6 +30,7 @@ public class CliConfig { private String baseOutputPath; private long startTime; private long endTime; + private int numReducers = 0; private DateFormat dateFormat; public CliConfig() { @@ -40,6 +41,10 @@ public CliConfig() { endTime = -1L; } + public int getNumReducers() { + return numReducers; + } + public boolean showHelp() { return showHelp; } @@ -91,4 +96,8 @@ public void setDateFormat(String dateFormat) { public DateFormat getDateFormat() { return dateFormat; } + + public void setNumReducers(int numReducers) { + this.numReducers = numReducers; + } } diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java index 4fbb05d6c0..ea6f8e7df2 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java @@ -36,6 +36,7 @@ public Options buildOptions() { options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", CliConfig.BASE_PATH_DEFAULT))); options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", CliConfig.BASE_OUTPUT_PATH_DEFAULT))); options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true)); + options.addOption(newOption("nr", "num_reducers", true, "Number of reducers to use", true)); options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time.")); options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch.")); return options; @@ -77,6 +78,13 @@ public void parse(CommandLine commandLine, CliConfig config) throws java.text.Pa //no-op } } + if (commandLine.hasOption("num_reducers")) { + int numReducers = Integer.parseInt(commandLine.getOptionValue("num_reducers")); + config.setNumReducers(numReducers); + } + else { + config.setNumReducers(10); + } if (commandLine.hasOption("end_time")) { try { if (commandLine.hasOption("date_format")) { diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java index e7ce4da86c..d96e16650b 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java @@ -93,6 +93,7 @@ public int run(String[] args) { new Path(config.getBaseOutputPath()), startTime, endTime, + config.getNumReducers(), config.getFixedFields(), hadoopConf, FileSystem.get(hadoopConf), @@ -128,6 +129,7 @@ public int run(String[] args) { new Path(config.getBaseOutputPath()), startTime, endTime, + config.getNumReducers(), config.getQuery(), hadoopConf, FileSystem.get(hadoopConf), @@ -146,7 +148,12 @@ public int run(String[] args) { String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ"); String outFileName = String.format("pcap-data-%s.pcap", timestamp); try { - resultsWriter.write(results, outFileName); + if(results.size() > 0) { + resultsWriter.write(results, outFileName); + } + else { + System.out.println("No results returned."); + } } catch (IOException e) { LOGGER.error("Unable to write file", e); return -1; diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index 8fcdeedb64..d4367ea26a 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -276,6 +276,7 @@ public Void getResult() { , new Path(queryDir.getAbsolutePath()) , getTimestamp(4, pcapEntries) , getTimestamp(5, pcapEntries) + , 10 , new EnumMap<>(Constants.Fields.class) , new Configuration() , FileSystem.get(new Configuration()) @@ -292,6 +293,7 @@ public Void getResult() { , new Path(queryDir.getAbsolutePath()) , getTimestamp(4, pcapEntries) , getTimestamp(5, pcapEntries) + , 10 , "" , new Configuration() , FileSystem.get(new Configuration()) @@ -307,6 +309,7 @@ public Void getResult() { , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(1, pcapEntries) + , 10 , new EnumMap(Constants.Fields.class) {{ put(Constants.Fields.DST_ADDR, "207.28.210.1"); }} @@ -325,6 +328,7 @@ public Void getResult() { , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(1, pcapEntries) + , 10 , "ip_dst_addr == '207.28.210.1'" , new Configuration() , FileSystem.get(new Configuration()) @@ -340,6 +344,7 @@ public Void getResult() { , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(1, pcapEntries) + , 10 , new EnumMap(Constants.Fields.class) {{ put(Constants.Fields.PROTOCOL, "foo"); }} @@ -358,6 +363,7 @@ public Void getResult() { , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(1, pcapEntries) + , 10 , "protocol == 'foo'" , new Configuration() , FileSystem.get(new Configuration()) @@ -373,6 +379,7 @@ public Void getResult() { , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 + , 10 , new EnumMap<>(Constants.Fields.class) , new Configuration() , FileSystem.get(new Configuration()) @@ -389,6 +396,7 @@ public Void getResult() { , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 + , 10 , "" , new Configuration() , FileSystem.get(new Configuration()) @@ -403,6 +411,7 @@ public Void getResult() { , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 + , 10 , new EnumMap(Constants.Fields.class) {{ put(Constants.Fields.DST_PORT, "22"); }} @@ -433,6 +442,7 @@ public boolean apply(@Nullable JSONObject input) { , new Path(queryDir.getAbsolutePath()) , getTimestamp(0, pcapEntries) , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 + , 10 , "ip_dst_port == '22'" , new Configuration() , FileSystem.get(new Configuration()) diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index 92ab26a217..4d6432e172 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -71,7 +71,8 @@ public void runs_fixed_pcap_filter_job_with_default_argument_list() throws Excep "-ip_dst_addr", "192.168.1.2", "-ip_src_port", "8081", "-ip_dst_port", "8082", - "-protocol", "6" + "-protocol", "6", + "-num_reducers", "10" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); @@ -86,7 +87,7 @@ public void runs_fixed_pcap_filter_job_with_default_argument_list() throws Excep put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); }}; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); @@ -107,7 +108,8 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list_and_default_datef "-ip_src_port", "8081", "-ip_dst_port", "8082", "-protocol", "6", - "-include_reverse" + "-include_reverse", + "-num_reducers", "10" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); @@ -122,7 +124,7 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list_and_default_datef put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true"); }}; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); @@ -144,7 +146,8 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list() throws Exceptio "-ip_src_port", "8081", "-ip_dst_port", "8082", "-protocol", "6", - "-include_reverse" + "-include_reverse", + "-num_reducers", "10" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); @@ -161,7 +164,7 @@ public void runs_fixed_pcap_filter_job_with_full_argument_list() throws Exceptio long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss"); long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss"); - when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(pcaps); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); @@ -184,6 +187,7 @@ public void runs_query_pcap_filter_job_with_default_argument_list() throws Excep String[] args = { "query", "-start_time", "500", + "-num_reducers", "10", "-query", "some query string" }; List pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); @@ -192,7 +196,7 @@ public void runs_query_pcap_filter_job_with_default_argument_list() throws Excep Path base_output_path = new Path(CliConfig.BASE_OUTPUT_PATH_DEFAULT); String query = "some query string"; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); @@ -206,6 +210,7 @@ public void runs_query_pcap_filter_job_with_full_argument_list() throws Exceptio "query", "-start_time", "500", "-end_time", "1000", + "-num_reducers", "10", "-base_path", "/base/path", "-base_output_path", "/base/output/path", "-query", "some query string" @@ -216,7 +221,7 @@ public void runs_query_pcap_filter_job_with_full_argument_list() throws Exceptio Path base_output_path = new Path("/base/output/path"); String query = "some query string"; - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps); + when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(pcaps); when(clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ")).thenReturn("20160615183527162+0000"); PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock); @@ -235,6 +240,7 @@ public void invalid_fixed_filter_arg_prints_help() throws Exception { "fixed", "-start_time", "500", "-end_time", "1000", + "-num_reducers", "10", "-base_path", "/base/path", "-base_output_path", "/base/output/path", "-query", "THIS IS AN ERROR" @@ -259,6 +265,7 @@ public void invalid_query_filter_arg_prints_help() throws Exception { "query", "-start_time", "500", "-end_time", "1000", + "-num_reducers", "10", "-base_path", "/base/path", "-base_output_path", "/base/output/path", "-ip_src_addr", "THIS IS AN ERROR" diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index a181637f3a..cce40748c2 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -19,6 +19,7 @@ package org.apache.metron.pcap.mr; import com.google.common.base.Joiner; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -29,6 +30,7 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; @@ -47,9 +49,42 @@ public class PcapJob { private static final Logger LOG = Logger.getLogger(PcapJob.class); + public static final String START_TS_CONF = "start_ts"; + public static final String END_TS_CONF = "end_ts"; + public static final String WIDTH_CONF = "width"; + public static class PcapPartitioner extends Partitioner implements Configurable { + private Configuration configuration; + Long start = null; + Long end = null; + Long width = null; + @Override + public int getPartition(LongWritable longWritable, BytesWritable bytesWritable, int numPartitions) { + if(start == null) { + initialize(); + } + long x = longWritable.get(); + int ret = (int)Long.divideUnsigned(x - start, width); + return ret; + } + + private void initialize() { + start = Long.parseUnsignedLong(configuration.get(START_TS_CONF)); + end = Long.parseUnsignedLong(configuration.get(END_TS_CONF)); + width = Long.parseLong(configuration.get(WIDTH_CONF)); + } + + @Override + public void setConf(Configuration conf) { + this.configuration = conf; + } + + @Override + public Configuration getConf() { + return configuration; + } + } public static class PcapMapper extends Mapper { - public static final String START_TS_CONF = "start_ts"; - public static final String END_TS_CONF = "end_ts"; + PcapFilter filter; long start; long end; @@ -170,6 +205,7 @@ public List query(Path basePath , Path baseOutputPath , long beginNS , long endNS + , int numReducers , T fields , Configuration conf , FileSystem fs @@ -189,6 +225,7 @@ public List query(Path basePath , outputPath , beginNS , endNS + , numReducers , fields , conf , fs @@ -204,28 +241,34 @@ public List query(Path basePath } + public static int findWidth(long start, long end, int numReducers) { + return (int)Long.divideUnsigned(end - start, numReducers) + 1; + } public Job createJob( Path basePath , Path outputPath , long beginNS , long endNS + , int numReducers , T fields , Configuration conf , FileSystem fs , PcapFilterConfigurator filterImpl ) throws IOException { - conf.set(PcapMapper.START_TS_CONF, Long.toUnsignedString(beginNS)); - conf.set(PcapMapper.END_TS_CONF, Long.toUnsignedString(endNS)); + conf.set(START_TS_CONF, Long.toUnsignedString(beginNS)); + conf.set(END_TS_CONF, Long.toUnsignedString(endNS)); + conf.set(WIDTH_CONF, "" + findWidth(beginNS, endNS, numReducers)); filterImpl.addToConfig(fields, conf); Job job = new Job(conf); job.setJarByClass(PcapJob.class); job.setMapperClass(PcapJob.PcapMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(BytesWritable.class); - job.setNumReduceTasks(1); + job.setNumReduceTasks(numReducers); job.setReducerClass(PcapReducer.class); + job.setPartitionerClass(PcapPartitioner.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(BytesWritable.class); SequenceFileInputFormat.addInputPaths(job, Joiner.on(',').join(getPaths(fs, basePath, beginNS, endNS ))); From 7d77bf8d0e3d8d69e0476109bfbe8b10d610931a Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 19 Aug 2016 07:55:12 -0400 Subject: [PATCH 2/2] updating REST interface. --- .../pcapservice/PcapReceiverImplRestEasy.java | 5 +++++ .../PcapReceiverImplRestEasyTest.java | 19 ++++++++++--------- .../components/KafkaWithZKComponent.java | 8 ++++++-- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java index 95103184b2..18b5dc92d9 100644 --- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java +++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java @@ -104,6 +104,7 @@ private static boolean isValidPort(String port) { * @param query Filter results based on this query * @param startTime Only return packets originating after this start time * @param endTime Only return packets originating before this end time + * @param numReducers Number of reducers to use * @param servlet_response * @return REST response * @throws IOException @@ -114,6 +115,7 @@ public Response getPcapsByIdentifiers( @QueryParam ("query") String query, @DefaultValue("-1") @QueryParam ("startTime")long startTime, @DefaultValue("-1") @QueryParam ("endTime")long endTime, + @DefaultValue("10") @QueryParam ("numReducers")int numReducers, @Context HttpServletResponse servlet_response) throws IOException { @@ -139,6 +141,7 @@ public Response getPcapsByIdentifiers( , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath()) , startTime , endTime + , numReducers , query , CONFIGURATION.get() , FileSystem.get(CONFIGURATION.get()) @@ -184,6 +187,7 @@ public Response getPcapsByIdentifiers( @QueryParam ("dstPort") String dstPort, @DefaultValue("-1") @QueryParam ("startTime")long startTime, @DefaultValue("-1") @QueryParam ("endTime")long endTime, + @DefaultValue("10") @QueryParam ("numReducers")int numReducers, @DefaultValue("false") @QueryParam ("includeReverseTraffic") boolean includeReverseTraffic, @Context HttpServletResponse servlet_response) @@ -237,6 +241,7 @@ public Response getPcapsByIdentifiers( , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath()) , startTime , endTime + , numReducers , query , CONFIGURATION.get() , FileSystem.get(CONFIGURATION.get()) diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java index bfe2233e15..1c1c23609c 100644 --- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java +++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java @@ -48,6 +48,7 @@ public List query( Path basePath , Path baseOutputPath , long beginNS , long endNS + , int numReducers , T fields , Configuration conf , FileSystem fs @@ -91,7 +92,7 @@ public void testNormalFixedPath() throws Exception { { boolean includeReverseTraffic = false; - fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); @@ -104,7 +105,7 @@ public void testNormalFixedPath() throws Exception { } { boolean includeReverseTraffic = true; - fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); @@ -122,7 +123,7 @@ public void testNormalQueryPath() throws Exception { long startTime = 100; long endTime = 1000; String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'"; - queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null); + queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, 10, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath); Assert.assertEquals(query, queryQueryHandler.fields); @@ -140,7 +141,7 @@ public void testNullSrcIp() throws Exception { long startTime = 100; long endTime = 1000; boolean includeReverseTraffic = false; - fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); @@ -162,7 +163,7 @@ public void testNullDstIp() throws Exception { long startTime = 100; long endTime = 1000; boolean includeReverseTraffic = false; - fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); @@ -185,7 +186,7 @@ public void testEmptyStartTime() throws Exception { long endTime = 1000; { boolean includeReverseTraffic = false; - fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); @@ -198,7 +199,7 @@ public void testEmptyStartTime() throws Exception { } { String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'"; - queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null); + queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, 10, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath); Assert.assertEquals(query, queryQueryHandler.fields); @@ -218,7 +219,7 @@ public void testEmptyEndTime() throws Exception { long endTime = -1; { boolean includeReverseTraffic = false; - fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); @@ -231,7 +232,7 @@ public void testEmptyEndTime() throws Exception { } { String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'"; - queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null); + queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, 10, null); Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath); Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath); Assert.assertEquals(query, queryQueryHandler.fields); diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java index 6d2261b4da..ffe7b547b9 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java @@ -157,8 +157,12 @@ public String getZookeeperConnect() { @Override public void stop() { - kafkaServer.shutdown(); - zkClient.close(); + if(kafkaServer != null) { + kafkaServer.shutdown(); + } + if(zkClient != null) { + zkClient.close(); + } if(zkServer != null) { zkServer.shutdown(); }