Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private static boolean isValidPort(String port) {
* @param query Filter results based on this query
* @param startTime Only return packets originating after this start time
* @param endTime Only return packets originating before this end time
* @param numReducers Number of reducers to use
* @param servlet_response
* @return REST response
* @throws IOException
Expand All @@ -114,6 +115,7 @@ public Response getPcapsByIdentifiers(
@QueryParam ("query") String query,
@DefaultValue("-1") @QueryParam ("startTime")long startTime,
@DefaultValue("-1") @QueryParam ("endTime")long endTime,
@DefaultValue("10") @QueryParam ("numReducers")int numReducers,
@Context HttpServletResponse servlet_response)

throws IOException {
Expand All @@ -139,6 +141,7 @@ public Response getPcapsByIdentifiers(
, new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
, startTime
, endTime
, numReducers
, query
, CONFIGURATION.get()
, FileSystem.get(CONFIGURATION.get())
Expand Down Expand Up @@ -184,6 +187,7 @@ public Response getPcapsByIdentifiers(
@QueryParam ("dstPort") String dstPort,
@DefaultValue("-1") @QueryParam ("startTime")long startTime,
@DefaultValue("-1") @QueryParam ("endTime")long endTime,
@DefaultValue("10") @QueryParam ("numReducers")int numReducers,
@DefaultValue("false") @QueryParam ("includeReverseTraffic") boolean includeReverseTraffic,
@Context HttpServletResponse servlet_response)

Expand Down Expand Up @@ -237,6 +241,7 @@ public Response getPcapsByIdentifiers(
, new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
, startTime
, endTime
, numReducers
, query
, CONFIGURATION.get()
, FileSystem.get(CONFIGURATION.get())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public <T> List<byte[]> query( Path basePath
, Path baseOutputPath
, long beginNS
, long endNS
, int numReducers
, T fields
, Configuration conf
, FileSystem fs
Expand Down Expand Up @@ -91,7 +92,7 @@ public void testNormalFixedPath() throws Exception {

{
boolean includeReverseTraffic = false;
fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
Expand All @@ -104,7 +105,7 @@ public void testNormalFixedPath() throws Exception {
}
{
boolean includeReverseTraffic = true;
fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
Expand All @@ -122,7 +123,7 @@ public void testNormalQueryPath() throws Exception {
long startTime = 100;
long endTime = 1000;
String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, 10, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
Assert.assertEquals(query, queryQueryHandler.fields);
Expand All @@ -140,7 +141,7 @@ public void testNullSrcIp() throws Exception {
long startTime = 100;
long endTime = 1000;
boolean includeReverseTraffic = false;
fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
Expand All @@ -162,7 +163,7 @@ public void testNullDstIp() throws Exception {
long startTime = 100;
long endTime = 1000;
boolean includeReverseTraffic = false;
fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
Expand All @@ -185,7 +186,7 @@ public void testEmptyStartTime() throws Exception {
long endTime = 1000;
{
boolean includeReverseTraffic = false;
fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
Expand All @@ -198,7 +199,7 @@ public void testEmptyStartTime() throws Exception {
}
{
String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, 10, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
Assert.assertEquals(query, queryQueryHandler.fields);
Expand All @@ -218,7 +219,7 @@ public void testEmptyEndTime() throws Exception {
long endTime = -1;
{
boolean includeReverseTraffic = false;
fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, 10, includeReverseTraffic, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
Expand All @@ -231,7 +232,7 @@ public void testEmptyEndTime() throws Exception {
}
{
String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, 10, null);
Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
Assert.assertEquals(query, queryQueryHandler.fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,12 @@ public String getZookeeperConnect() {

@Override
public void stop() {
kafkaServer.shutdown();
zkClient.close();
if(kafkaServer != null) {
kafkaServer.shutdown();
}
if(zkClient != null) {
zkClient.close();
}
if(zkServer != null) {
zkServer.shutdown();
}
Expand Down
4 changes: 4 additions & 0 deletions metron-platform/metron-pcap-backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ usage: Fixed filter options
-dp,--ip_dst_port <arg> Destination port
-et,--end_time <arg> Packet end time range. Default is current
system time.
-nr,--num_reducers <arg> The number of reducers to use. Default
is 10.
-h,--help Display help
-ir,--include_reverse Indicates if filter should check swapped
src/dest addresses and IPs
Expand All @@ -116,6 +118,8 @@ usage: Query filter options
millis since the epoch.
-et,--end_time <arg> Packet end time range. Default is current
system time.
-nr,--num_reducers <arg> The number of reducers to use. Default
is 10.
-h,--help Display help
-q,--query <arg> Query string to use as a filter
-st,--start_time <arg> (required) Packet start time range.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class CliConfig {
private String baseOutputPath;
private long startTime;
private long endTime;
private int numReducers = 0;
private DateFormat dateFormat;

public CliConfig() {
Expand All @@ -40,6 +41,10 @@ public CliConfig() {
endTime = -1L;
}

public int getNumReducers() {
return numReducers;
}

public boolean showHelp() {
return showHelp;
}
Expand Down Expand Up @@ -91,4 +96,8 @@ public void setDateFormat(String dateFormat) {
public DateFormat getDateFormat() {
return dateFormat;
}

public void setNumReducers(int numReducers) {
this.numReducers = numReducers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public Options buildOptions() {
options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", CliConfig.BASE_PATH_DEFAULT)));
options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", CliConfig.BASE_OUTPUT_PATH_DEFAULT)));
options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true));
options.addOption(newOption("nr", "num_reducers", true, "Number of reducers to use", true));
options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time."));
options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch."));
return options;
Expand Down Expand Up @@ -77,6 +78,13 @@ public void parse(CommandLine commandLine, CliConfig config) throws java.text.Pa
//no-op
}
}
if (commandLine.hasOption("num_reducers")) {
int numReducers = Integer.parseInt(commandLine.getOptionValue("num_reducers"));
config.setNumReducers(numReducers);
}
else {
config.setNumReducers(10);
}
if (commandLine.hasOption("end_time")) {
try {
if (commandLine.hasOption("date_format")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public int run(String[] args) {
new Path(config.getBaseOutputPath()),
startTime,
endTime,
config.getNumReducers(),
config.getFixedFields(),
hadoopConf,
FileSystem.get(hadoopConf),
Expand Down Expand Up @@ -128,6 +129,7 @@ public int run(String[] args) {
new Path(config.getBaseOutputPath()),
startTime,
endTime,
config.getNumReducers(),
config.getQuery(),
hadoopConf,
FileSystem.get(hadoopConf),
Expand All @@ -146,7 +148,12 @@ public int run(String[] args) {
String timestamp = clock.currentTimeFormatted("yyyyMMddHHmmssSSSZ");
String outFileName = String.format("pcap-data-%s.pcap", timestamp);
try {
resultsWriter.write(results, outFileName);
if(results.size() > 0) {
resultsWriter.write(results, outFileName);
}
else {
System.out.println("No results returned.");
}
} catch (IOException e) {
LOGGER.error("Unable to write file", e);
return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ public Void getResult() {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(4, pcapEntries)
, getTimestamp(5, pcapEntries)
, 10
, new EnumMap<>(Constants.Fields.class)
, new Configuration()
, FileSystem.get(new Configuration())
Expand All @@ -292,6 +293,7 @@ public Void getResult() {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(4, pcapEntries)
, getTimestamp(5, pcapEntries)
, 10
, ""
, new Configuration()
, FileSystem.get(new Configuration())
Expand All @@ -307,6 +309,7 @@ public Void getResult() {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(1, pcapEntries)
, 10
, new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
put(Constants.Fields.DST_ADDR, "207.28.210.1");
}}
Expand All @@ -325,6 +328,7 @@ public Void getResult() {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(1, pcapEntries)
, 10
, "ip_dst_addr == '207.28.210.1'"
, new Configuration()
, FileSystem.get(new Configuration())
Expand All @@ -340,6 +344,7 @@ public Void getResult() {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(1, pcapEntries)
, 10
, new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
put(Constants.Fields.PROTOCOL, "foo");
}}
Expand All @@ -358,6 +363,7 @@ public Void getResult() {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(1, pcapEntries)
, 10
, "protocol == 'foo'"
, new Configuration()
, FileSystem.get(new Configuration())
Expand All @@ -373,6 +379,7 @@ public Void getResult() {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
, 10
, new EnumMap<>(Constants.Fields.class)
, new Configuration()
, FileSystem.get(new Configuration())
Expand All @@ -389,6 +396,7 @@ public Void getResult() {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
, 10
, ""
, new Configuration()
, FileSystem.get(new Configuration())
Expand All @@ -403,6 +411,7 @@ public Void getResult() {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
, 10
, new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
put(Constants.Fields.DST_PORT, "22");
}}
Expand Down Expand Up @@ -433,6 +442,7 @@ public boolean apply(@Nullable JSONObject input) {
, new Path(queryDir.getAbsolutePath())
, getTimestamp(0, pcapEntries)
, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
, 10
, "ip_dst_port == '22'"
, new Configuration()
, FileSystem.get(new Configuration())
Expand Down
Loading