From b3af68484718a66393f4b0417bc525c462afed5d Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Thu, 12 May 2016 15:43:09 -0400 Subject: [PATCH 1/3] METRON-155 Added query filtering capability for PCAP via Metron REST API --- .../pcapservice/PcapReceiverImplRestEasy.java | 90 +++++-- .../PcapReceiverImplRestEasyTest.java | 192 +++++++++------ .../org/apache/metron/common/Constants.java | 24 +- .../common/query/PredicateProcessor.java | 12 +- .../metron/common/query/QueryParserTest.java | 7 + ...lterTest.java => FixedPcapFilterTest.java} | 84 ++++--- .../metron/pcap/QueryPcapFilterTest.java | 227 ++++++++++++++++++ .../PcapTopologyIntegrationTest.java | 103 +++++++- .../apache/metron/pcap/filter/PcapFilter.java | 28 +++ .../pcap/filter/PcapFilterConfigurator.java | 27 +++ .../metron/pcap/filter/PcapFilters.java | 41 ++++ .../pcap/filter/fixed/FixedPcapFilter.java | 143 +++++++++++ .../pcap/filter/query/PcapFieldResolver.java | 42 ++++ .../pcap/filter/query/QueryPcapFilter.java | 74 ++++++ .../org/apache/metron/pcap/mr/PcapFilter.java | 121 ---------- .../org/apache/metron/pcap/mr/PcapJob.java | 48 ++-- 16 files changed, 987 insertions(+), 276 deletions(-) rename metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/{PcapFilterTest.java => FixedPcapFilterTest.java} (78%) create mode 100644 metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java create mode 100644 metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java create mode 100644 metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilterConfigurator.java create mode 100644 metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java create mode 100644 metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java create mode 100644 metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/PcapFieldResolver.java create mode 100644 metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java delete mode 100644 metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java index 1d0beb8067..aa64aff9f1 100644 --- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java +++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java @@ -17,29 +17,29 @@ */ package org.apache.metron.pcapservice; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.EnumMap; -import java.util.List; - -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.*; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.Logger; - -import com.google.common.annotations.VisibleForTesting; import org.apache.metron.common.Constants; import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; +import org.apache.metron.pcap.filter.query.QueryPcapFilter; import org.apache.metron.pcap.mr.PcapJob; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.*; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumMap; +import java.util.List; + @Path("/") public class PcapReceiverImplRestEasy { @@ -96,6 +96,66 @@ private static boolean isValidPort(String port) { } } return false; + } + /* + * (non-Javadoc) + * + * @see + * com.cisco.opensoc.hbase.client.IPcapReceiver#getPcapsByIdentifiers(java.lang + * .String, java.lang.String, java.lang.String, java.lang.String, + * java.lang.String, long, long, boolean, + * javax.servlet.http.HttpServletResponse) + */ + @GET + @Path("/pcapGetter/getPcapsByQuery") + public Response getPcapsByIdentifiers( + @QueryParam ("query") String query, + @DefaultValue("-1") @QueryParam ("startTime")long startTime, + @DefaultValue("-1") @QueryParam ("endTime")long endTime, + @Context HttpServletResponse servlet_response) + + throws IOException { + PcapsResponse response = new PcapsResponse(); + try { + if (startTime < 0) { + startTime = 0L; + } + if (endTime < 0) { + endTime = System.currentTimeMillis(); + } + if(query == null) { + return Response.serverError().status(Response.Status.NO_CONTENT) + .entity("Query is null").build(); + } + //convert to nanoseconds since the epoch + startTime = TimestampConverters.MILLISECONDS.toNanoseconds(startTime); + endTime = TimestampConverters.MILLISECONDS.toNanoseconds(endTime); + if(LOGGER.isDebugEnabled()) { + LOGGER.debug("Query received: " + query); + } + response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath()) + , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath()) + , startTime + , endTime + , query + , CONFIGURATION.get() + , FileSystem.get(CONFIGURATION.get()) + , new QueryPcapFilter.Configurator() + ) + ); + + } catch (Exception e) { + LOGGER.error("Exception occurred while fetching Pcaps by identifiers :", + e); + throw new WebApplicationException("Unable to fetch Pcaps via MR job", e); + } + + // return http status '200 OK' along with the complete pcaps response file, + // and headers + return Response + .ok(response.getPcaps(), MediaType.APPLICATION_OCTET_STREAM) + .status(200).build(); + } /* @@ -109,7 +169,6 @@ private static boolean isValidPort(String port) { */ @GET @Path("/pcapGetter/getPcapsByIdentifiers") - public Response getPcapsByIdentifiers( @QueryParam ("srcIp") String srcIp, @QueryParam ("dstIp") String dstIp, @@ -174,6 +233,7 @@ public Response getPcapsByIdentifiers( , query , CONFIGURATION.get() , FileSystem.get(CONFIGURATION.get()) + , new FixedPcapFilter.Configurator() ) ); diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java index eab4998554..1793b06174 100644 --- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java +++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.metron.common.Constants; import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.mr.PcapJob; import org.junit.Assert; import org.junit.Before; @@ -33,33 +34,43 @@ import java.util.List; public class PcapReceiverImplRestEasyTest { - public static class MockQueryHandler extends PcapJob { + + public static class MockQueryHandler extends PcapJob { Path basePath; Path baseOutputPath; long beginNS; long endNS; - EnumMap fields; + R fields; + PcapFilterConfigurator filterImpl; + @Override - public List query( Path basePath + public List query( Path basePath , Path baseOutputPath , long beginNS , long endNS - , EnumMap fields + , T fields , Configuration conf , FileSystem fs + , PcapFilterConfigurator filterImpl ) throws IOException, ClassNotFoundException, InterruptedException { this.basePath = basePath; this.baseOutputPath = baseOutputPath; this.beginNS = beginNS; this.endNS = endNS; - this.fields = fields; + this.fields = (R) fields; + this.filterImpl = (PcapFilterConfigurator) filterImpl; return null; } } - final MockQueryHandler queryHandler = new MockQueryHandler(); - PcapReceiverImplRestEasy restEndpoint = new PcapReceiverImplRestEasy() {{ - this.queryUtil = queryHandler; + + final MockQueryHandler> fixedQueryHandler = new MockQueryHandler>(); + final MockQueryHandler queryQueryHandler = new MockQueryHandler(); + PcapReceiverImplRestEasy fixedRestEndpoint = new PcapReceiverImplRestEasy() {{ + this.queryUtil = fixedQueryHandler; + }}; + PcapReceiverImplRestEasy queryRestEndpoint = new PcapReceiverImplRestEasy() {{ + this.queryUtil = queryQueryHandler; }}; @Before @@ -69,7 +80,7 @@ public void setup() throws Exception { } @Test - public void testNormalPath() throws Exception { + public void testNormalFixedPath() throws Exception { String srcIp = "srcIp"; String dstIp = "dstIp"; String protocol = "protocol"; @@ -80,32 +91,45 @@ public void testNormalPath() throws Exception { { boolean includeReverseTraffic = false; - restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); - Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath); - Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath); - Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR)); - Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR)); - Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT)); - Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT)); - Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryHandler.beginNS); - Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS); - Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); + Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); + Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); + Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); + Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR)); + Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT)); + Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT)); + Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS); + Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS); + Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); } { boolean includeReverseTraffic = true; - restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); - Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath); - Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath); - Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR)); - Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR)); - Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT)); - Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT)); - Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryHandler.beginNS); - Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS); - Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); + Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); + Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); + Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); + Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR)); + Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT)); + Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT)); + Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS); + Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS); + Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); } } + @Test + public void testNormalQueryPath() throws Exception { + long startTime = 100; + long endTime = 1000; + String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'"; + queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null); + Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath); + Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath); + Assert.assertEquals(query, queryQueryHandler.fields); + Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryQueryHandler.beginNS); + Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryQueryHandler.endNS); + } + @Test public void testNullSrcIp() throws Exception { String srcIp = null; @@ -116,16 +140,16 @@ public void testNullSrcIp() throws Exception { long startTime = 100; long endTime = 1000; boolean includeReverseTraffic = false; - restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); - Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath); - Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath); - Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR)); - Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR)); - Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT)); - Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT)); - Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryHandler.beginNS); - Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS); - Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); + Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); + Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); + Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); + Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR)); + Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT)); + Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT)); + Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS); + Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS); + Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); } @Test @@ -138,60 +162,82 @@ public void testNullDstIp() throws Exception { long startTime = 100; long endTime = 1000; boolean includeReverseTraffic = false; - restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); - Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath); - Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath); - Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR)); - Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR)); - Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT)); - Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT)); - Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryHandler.beginNS); - Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS); - Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); + Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); + Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); + Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); + Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR)); + Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT)); + Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT)); + Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS); + Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS); + Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); } @Test public void testEmptyStartTime() throws Exception { String srcIp = "srcIp"; - String dstIp = null; + String dstIp = "dstIp"; String protocol = "protocol"; String srcPort = "80"; String dstPort = "100"; long startTime = -1; long endTime = 1000; - boolean includeReverseTraffic = false; - restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); - Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath); - Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath); - Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR)); - Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR)); - Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT)); - Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT)); - Assert.assertEquals(0, queryHandler.beginNS); - Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS); - Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); + { + boolean includeReverseTraffic = false; + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); + Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); + Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); + Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); + Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR)); + Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT)); + Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT)); + Assert.assertEquals(0, fixedQueryHandler.beginNS); + Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS); + Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); + } + { + String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'"; + queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null); + Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath); + Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath); + Assert.assertEquals(query, queryQueryHandler.fields); + Assert.assertEquals(0, queryQueryHandler.beginNS); + Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryQueryHandler.endNS); + } } @Test public void testEmptyEndTime() throws Exception { String srcIp = "srcIp"; - String dstIp = null; + String dstIp = "dstIp"; String protocol = "protocol"; String srcPort = "80"; String dstPort = "100"; long startTime = -1; long endTime = -1; - boolean includeReverseTraffic = false; - restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); - Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath); - Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath); - Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR)); - Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR)); - Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT)); - Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT)); - Assert.assertEquals(0, queryHandler.beginNS); - Assert.assertTrue(queryHandler.endNS > 0); - Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); + { + boolean includeReverseTraffic = false; + fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null); + Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath); + Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath); + Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR)); + Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR)); + Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT)); + Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT)); + Assert.assertEquals(0, fixedQueryHandler.beginNS); + Assert.assertTrue(fixedQueryHandler.endNS > 0); + Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC))); + } + { + String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'"; + queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null); + Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath); + Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath); + Assert.assertEquals(query, queryQueryHandler.fields); + Assert.assertEquals(0, queryQueryHandler.beginNS); + Assert.assertTrue(queryQueryHandler.endNS > 0); + } } } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java index 4c7c2220eb..368244bcd2 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java @@ -17,6 +17,9 @@ */ package org.apache.metron.common; +import java.util.HashMap; +import java.util.Map; + public class Constants { public static final String GLOBAL_CONFIG_NAME = "global"; @@ -29,9 +32,11 @@ public class Constants { public static final String SENSOR_TYPE = "source.type"; public static final String ENRICHMENT_TOPIC = "enrichments"; public static final String ERROR_STREAM = "error"; + public static final String SIMPLE_HBASE_ENRICHMENT = "hbaseEnrichment"; + public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel"; public static enum Fields { - SRC_ADDR("ip_src_addr") + SRC_ADDR("ip_src_addr") ,SRC_PORT("ip_src_port") ,DST_ADDR("ip_dst_addr") ,DST_PORT("ip_dst_port") @@ -39,16 +44,29 @@ public static enum Fields { ,TIMESTAMP("timestamp") ,INCLUDES_REVERSE_TRAFFIC("includes_reverse_traffic") ; + private static Map nameToField; + + static { + nameToField = new HashMap<>(); + for (Fields f : Fields.values()) { + nameToField.put(f.getName(), f); + } + } + private String name; + Fields(String name) { this.name = name; } + public String getName() { return name; } + + public static Fields fromString(String fieldName) { + return nameToField.get(fieldName); + } } - public static final String SIMPLE_HBASE_ENRICHMENT = "hbaseEnrichment"; - public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel"; } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java index 64993196c0..3e3334155d 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java @@ -18,12 +18,20 @@ package org.apache.metron.common.query; -import org.antlr.v4.runtime.*; -import org.apache.metron.common.query.generated.*; +import org.antlr.v4.runtime.ANTLRInputStream; +import org.antlr.v4.runtime.CommonTokenStream; +import org.antlr.v4.runtime.TokenStream; +import org.apache.metron.common.query.generated.PredicateLexer; +import org.apache.metron.common.query.generated.PredicateParser; + +import static org.apache.commons.lang3.StringUtils.isEmpty; public class PredicateProcessor { public boolean parse(String rule, VariableResolver resolver) { + if (isEmpty(rule)) { + return true; + } ANTLRInputStream input = new ANTLRInputStream(rule); PredicateLexer lexer = new PredicateLexer(input); lexer.removeErrorListeners(); diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java index ad798e2827..c132464eeb 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java @@ -67,6 +67,7 @@ public void testSimpleOps() throws Exception { Assert.assertTrue(run("foo== foo", v -> variableMap.get(v))); Assert.assertTrue(run("empty== ''", v -> variableMap.get(v))); Assert.assertTrue(run("spaced == 'metron is great'", v -> variableMap.get(v))); + Assert.assertTrue(run("", v -> variableMap.get(v))); } @Test @@ -82,7 +83,10 @@ public void testBooleanOps() throws Exception { Assert.assertFalse(run("('casey' == foo) and (FALSE == TRUE)", v -> variableMap.get(v))); Assert.assertFalse(run("'casey' == foo and FALSE", v -> variableMap.get(v))); Assert.assertTrue(run("'casey' == foo and true", v -> variableMap.get(v))); + Assert.assertTrue(run("true", v -> variableMap.get(v))); + Assert.assertTrue(run("TRUE", v -> variableMap.get(v))); } + @Test public void testList() throws Exception { final Map variableMap = new HashMap() {{ @@ -98,6 +102,7 @@ public void testList() throws Exception { Assert.assertFalse(run("foo not in [ 'casey', 'david' ]", v -> variableMap.get(v))); Assert.assertFalse(run("foo not in [ 'casey', 'david' ] and 'casey' == foo", v -> variableMap.get(v))); } + @Test public void testExists() throws Exception { final Map variableMap = new HashMap() {{ @@ -123,6 +128,7 @@ public void testStringFunctions() throws Exception { Assert.assertTrue(run("TO_UPPER(foo) in [ TO_UPPER('casey'), 'david' ] and IN_SUBNET(ip, '192.168.0.0/24')", v -> variableMap.get(v))); Assert.assertFalse(run("TO_LOWER(foo) in [ TO_UPPER('casey'), 'david' ]", v -> variableMap.get(v))); } + @Test public void testLogicalFunctions() throws Exception { final Map variableMap = new HashMap() {{ @@ -149,4 +155,5 @@ public void testLogicalFunctions() throws Exception { Assert.assertFalse(run("IN_SUBNET(ip_dst_addr, '192.168.0.0/24')", v-> variableMap.get(v))); Assert.assertTrue(run("not(IN_SUBNET(ip_dst_addr, '192.168.0.0/24'))", v-> variableMap.get(v))); } + } diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java similarity index 78% rename from metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java rename to metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java index 70c973a301..218d1430c1 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java @@ -19,16 +19,14 @@ package org.apache.metron.pcap; import org.apache.hadoop.conf.Configuration; -import org.apache.metron.*; import org.apache.metron.common.Constants; -import org.apache.metron.pcap.mr.PcapFilter; -import org.apache.metron.pcap.mr.PcapJob; +import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.junit.Assert; import org.junit.Test; import java.util.EnumMap; -public class PcapFilterTest { +public class FixedPcapFilterTest { @Test public void testTrivialEquality() throws Exception { Configuration config = new Configuration(); @@ -39,9 +37,9 @@ public void testTrivialEquality() throws Exception { put(Constants.Fields.DST_PORT, "1"); put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); }}; - PcapJob.addToConfig(fields, config); + new FixedPcapFilter.Configurator().addToConfig(fields, config); { - PcapFilter filter = new PcapFilter(config) { + FixedPcapFilter filter = new FixedPcapFilter() { @Override protected EnumMap packetToFields(PacketInfo pi) { return new EnumMap(Constants.Fields.class) {{ @@ -52,7 +50,8 @@ protected EnumMap packetToFields(PacketInfo pi) { }}; } }; - Assert.assertTrue(filter.apply(null)); + filter.configure(config); + Assert.assertTrue(filter.test(null)); } } @@ -66,9 +65,9 @@ public void testReverseTraffic() throws Exception { put(Constants.Fields.DST_PORT, "1"); put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true"); }}; - PcapJob.addToConfig(fields, config); + new FixedPcapFilter.Configurator().addToConfig(fields, config); { - PcapFilter filter = new PcapFilter(config) { + FixedPcapFilter filter = new FixedPcapFilter() { @Override protected EnumMap packetToFields(PacketInfo pi) { return new EnumMap(Constants.Fields.class) {{ @@ -79,10 +78,12 @@ protected EnumMap packetToFields(PacketInfo pi) { }}; } }; - Assert.assertTrue(filter.apply(null)); + filter.configure(config); + Assert.assertTrue(filter.test(null)); } + new FixedPcapFilter.Configurator().addToConfig(fields, config); { - PcapFilter filter = new PcapFilter(config) { + FixedPcapFilter filter = new FixedPcapFilter() { @Override protected EnumMap packetToFields(PacketInfo pi) { return new EnumMap(Constants.Fields.class) {{ @@ -93,10 +94,12 @@ protected EnumMap packetToFields(PacketInfo pi) { }}; } }; - Assert.assertTrue(filter.apply(null)); + filter.configure(config); + Assert.assertTrue(filter.test(null)); } + new FixedPcapFilter.Configurator().addToConfig(fields, config); { - PcapFilter filter = new PcapFilter(config) { + FixedPcapFilter filter = new FixedPcapFilter() { @Override protected EnumMap packetToFields(PacketInfo pi) { return new EnumMap(Constants.Fields.class) {{ @@ -107,7 +110,8 @@ protected EnumMap packetToFields(PacketInfo pi) { }}; } }; - Assert.assertFalse(filter.apply(null)); + filter.configure(config); + Assert.assertFalse(filter.test(null)); } } @Test @@ -119,9 +123,9 @@ public void testMissingDstAddr() throws Exception { put(Constants.Fields.DST_PORT, "1"); put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); }}; - PcapJob.addToConfig(fields, config); + new FixedPcapFilter.Configurator().addToConfig(fields, config); { - PcapFilter filter = new PcapFilter(config) { + FixedPcapFilter filter = new FixedPcapFilter() { @Override protected EnumMap packetToFields(PacketInfo pi) { return new EnumMap(Constants.Fields.class) {{ @@ -132,10 +136,12 @@ protected EnumMap packetToFields(PacketInfo pi) { }}; } }; - Assert.assertTrue(filter.apply(null)); + filter.configure(config); + Assert.assertTrue(filter.test(null)); } + new FixedPcapFilter.Configurator().addToConfig(fields, config); { - PcapFilter filter = new PcapFilter(config) { + FixedPcapFilter filter = new FixedPcapFilter() { @Override protected EnumMap packetToFields(PacketInfo pi) { return new EnumMap(Constants.Fields.class) {{ @@ -146,7 +152,8 @@ protected EnumMap packetToFields(PacketInfo pi) { }}; } }; - Assert.assertFalse(filter.apply(null)); + filter.configure(config); + Assert.assertFalse(filter.test(null)); } } @Test @@ -158,9 +165,9 @@ public void testMissingDstPort() throws Exception { put(Constants.Fields.DST_ADDR, "dst_ip"); put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); }}; - PcapJob.addToConfig(fields, config); + new FixedPcapFilter.Configurator().addToConfig(fields, config); { - PcapFilter filter = new PcapFilter(config) { + FixedPcapFilter filter = new FixedPcapFilter() { @Override protected EnumMap packetToFields(PacketInfo pi) { return new EnumMap(Constants.Fields.class) {{ @@ -171,10 +178,12 @@ protected EnumMap packetToFields(PacketInfo pi) { }}; } }; - Assert.assertTrue(filter.apply(null)); + filter.configure(config); + Assert.assertTrue(filter.test(null)); } + new FixedPcapFilter.Configurator().addToConfig(fields, config); { - PcapFilter filter = new PcapFilter(config) { + FixedPcapFilter filter = new FixedPcapFilter() { @Override protected EnumMap packetToFields(PacketInfo pi) { return new EnumMap(Constants.Fields.class) {{ @@ -185,10 +194,12 @@ protected EnumMap packetToFields(PacketInfo pi) { }}; } }; - Assert.assertTrue(filter.apply(null)); + filter.configure(config); + Assert.assertTrue(filter.test(null)); } + new FixedPcapFilter.Configurator().addToConfig(fields, config); { - PcapFilter filter = new PcapFilter(config) { + FixedPcapFilter filter = new FixedPcapFilter() { @Override protected EnumMap packetToFields(PacketInfo pi) { return new EnumMap(Constants.Fields.class) {{ @@ -199,7 +210,8 @@ protected EnumMap packetToFields(PacketInfo pi) { }}; } }; - Assert.assertFalse(filter.apply(null)); + filter.configure(config); + Assert.assertFalse(filter.test(null)); } } @Test @@ -211,9 +223,9 @@ public void testMissingSrcAddr() throws Exception { put(Constants.Fields.DST_PORT, "1"); put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); }}; - PcapJob.addToConfig(fields, config); + new FixedPcapFilter.Configurator().addToConfig(fields, config); { - PcapFilter filter = new PcapFilter(config) { + FixedPcapFilter filter = new FixedPcapFilter() { @Override protected EnumMap packetToFields(PacketInfo pi) { return new EnumMap(Constants.Fields.class) {{ @@ -224,7 +236,8 @@ protected EnumMap packetToFields(PacketInfo pi) { }}; } }; - Assert.assertTrue(filter.apply(null)); + filter.configure(config); + Assert.assertTrue(filter.test(null)); } } @Test @@ -236,9 +249,9 @@ public void testMissingSrcPort() throws Exception { put(Constants.Fields.DST_PORT, "1"); put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); }}; - PcapJob.addToConfig(fields, config); + new FixedPcapFilter.Configurator().addToConfig(fields, config); { - PcapFilter filter = new PcapFilter(config) { + FixedPcapFilter filter = new FixedPcapFilter() { @Override protected EnumMap packetToFields(PacketInfo pi) { return new EnumMap(Constants.Fields.class) {{ @@ -249,10 +262,12 @@ protected EnumMap packetToFields(PacketInfo pi) { }}; } }; - Assert.assertTrue(filter.apply(null)); + filter.configure(config); + Assert.assertTrue(filter.test(null)); } + new FixedPcapFilter.Configurator().addToConfig(fields, config); { - PcapFilter filter = new PcapFilter(config) { + FixedPcapFilter filter = new FixedPcapFilter() { @Override protected EnumMap packetToFields(PacketInfo pi) { return new EnumMap(Constants.Fields.class) {{ @@ -263,7 +278,8 @@ protected EnumMap packetToFields(PacketInfo pi) { }}; } }; - Assert.assertTrue(filter.apply(null)); + filter.configure(config); + Assert.assertTrue(filter.test(null)); } } } diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java new file mode 100644 index 0000000000..f07dc48beb --- /dev/null +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.metron.common.Constants; +import org.apache.metron.pcap.filter.PcapFilter; +import org.apache.metron.pcap.filter.query.QueryPcapFilter; +import org.junit.Assert; +import org.junit.Test; + +import java.util.EnumMap; + +public class QueryPcapFilterTest { + + @Test + public void testEmptyQueryFilter() throws Exception { + Configuration config = new Configuration(); + String query = ""; + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + PcapFilter filter = new QueryPcapFilter() { + + protected EnumMap packetToFields(PacketInfo pi) { + return new EnumMap(Constants.Fields.class) {{ + put(Constants.Fields.SRC_ADDR, "src_ip"); + put(Constants.Fields.SRC_PORT, 0); + put(Constants.Fields.DST_ADDR, "dst_ip"); + put(Constants.Fields.DST_PORT, 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + } + + @Test + public void testTrivialEquality() throws Exception { + Configuration config = new Configuration(); + String query = "ip_src_addr == 'src_ip' and ip_src_port == '0' and ip_dst_addr == 'dst_ip' and ip_dst_port == '1'"; + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + PcapFilter filter = new QueryPcapFilter() { + + protected EnumMap packetToFields(PacketInfo pi) { + return new EnumMap(Constants.Fields.class) {{ + put(Constants.Fields.SRC_ADDR, "src_ip"); + put(Constants.Fields.SRC_PORT, 0); + put(Constants.Fields.DST_ADDR, "dst_ip"); + put(Constants.Fields.DST_PORT, 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + } + + @Test + public void testMissingDstAddr() throws Exception { + Configuration config = new Configuration(); + String query = "ip_src_addr == 'src_ip' and ip_src_port == '0' and ip_dst_port == '1'"; + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected EnumMap packetToFields(PacketInfo pi) { + return new EnumMap(Constants.Fields.class) {{ + put(Constants.Fields.SRC_ADDR, "src_ip"); + put(Constants.Fields.SRC_PORT, 0); + put(Constants.Fields.DST_ADDR, "dst_ip"); + put(Constants.Fields.DST_PORT, 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected EnumMap packetToFields(PacketInfo pi) { + return new EnumMap(Constants.Fields.class) {{ + put(Constants.Fields.SRC_ADDR, "src_ip1"); + put(Constants.Fields.SRC_PORT, 0); + put(Constants.Fields.DST_ADDR, "dst_ip"); + put(Constants.Fields.DST_PORT, 1); + }}; + } + }; + filter.configure(config); + Assert.assertFalse(filter.test(null)); + } + } + + @Test + public void testMissingDstPort() throws Exception { + Configuration config = new Configuration(); + String query = "ip_src_addr == 'src_ip' and ip_src_port == '0' and ip_dst_addr == 'dst_ip'"; + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected EnumMap packetToFields(PacketInfo pi) { + return new EnumMap(Constants.Fields.class) {{ + put(Constants.Fields.SRC_ADDR, "src_ip"); + put(Constants.Fields.SRC_PORT, 0); + put(Constants.Fields.DST_ADDR, "dst_ip"); + put(Constants.Fields.DST_PORT, 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected EnumMap packetToFields(PacketInfo pi) { + return new EnumMap(Constants.Fields.class) {{ + put(Constants.Fields.SRC_ADDR, "src_ip"); + put(Constants.Fields.SRC_PORT, 0); + put(Constants.Fields.DST_ADDR, "dst_ip"); + put(Constants.Fields.DST_PORT, 100); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected EnumMap packetToFields(PacketInfo pi) { + return new EnumMap(Constants.Fields.class) {{ + put(Constants.Fields.SRC_ADDR, "src_ip"); + put(Constants.Fields.SRC_PORT, 100); + put(Constants.Fields.DST_ADDR, "dst_ip"); + put(Constants.Fields.DST_PORT, 100); + }}; + } + }; + filter.configure(config); + Assert.assertFalse(filter.test(null)); + } + } + + @Test + public void testMissingSrcAddr() throws Exception { + Configuration config = new Configuration(); + String query = "ip_src_port == '0' and ip_dst_addr == 'dst_ip' and ip_dst_port == '1'"; + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected EnumMap packetToFields(PacketInfo pi) { + return new EnumMap(Constants.Fields.class) {{ + put(Constants.Fields.SRC_ADDR, "src_ip"); + put(Constants.Fields.SRC_PORT, 0); + put(Constants.Fields.DST_ADDR, "dst_ip"); + put(Constants.Fields.DST_PORT, 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + } + + @Test + public void testMissingSrcPort() throws Exception { + Configuration config = new Configuration(); + String query = "ip_src_addr == 'src_ip' and ip_dst_addr == 'dst_ip' and ip_dst_port == '1'"; + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected EnumMap packetToFields(PacketInfo pi) { + return new EnumMap(Constants.Fields.class) {{ + put(Constants.Fields.SRC_ADDR, "src_ip"); + put(Constants.Fields.SRC_PORT, 0); + put(Constants.Fields.DST_ADDR, "dst_ip"); + put(Constants.Fields.DST_PORT, 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected EnumMap packetToFields(PacketInfo pi) { + return new EnumMap(Constants.Fields.class) {{ + put(Constants.Fields.SRC_ADDR, "src_ip"); + put(Constants.Fields.SRC_PORT, 100); + put(Constants.Fields.DST_ADDR, "dst_ip"); + put(Constants.Fields.DST_PORT, 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + } +} diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index 587ec7c407..ff85c1f0b4 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -25,7 +25,8 @@ import com.google.common.collect.Lists; import kafka.consumer.ConsumerIterator; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; @@ -42,6 +43,8 @@ import org.apache.metron.pcap.PacketInfo; import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.PcapMerger; +import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; +import org.apache.metron.pcap.filter.query.QueryPcapFilter; import org.apache.metron.pcap.mr.PcapJob; import org.apache.metron.spout.pcap.Endianness; import org.apache.metron.spout.pcap.scheme.TimestampScheme; @@ -51,7 +54,10 @@ import org.junit.Test; import javax.annotation.Nullable; -import java.io.*; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; import java.util.*; public class PcapTopologyIntegrationTest { @@ -221,7 +227,6 @@ public Void apply(@Nullable KafkaWithZKComponent kafkaWithZKComponent) { } } ); - //.withExistingZookeeper("localhost:2000"); final MRComponent mr = new MRComponent().withBasePath(baseDir.getAbsolutePath()); @@ -274,6 +279,22 @@ public Void getResult() { , new EnumMap<>(Constants.Fields.class) , new Configuration() , FileSystem.get(new Configuration()) + , new FixedPcapFilter.Configurator() + ); + Assert.assertEquals(results.size(), 2); + } + { + // Ensure that only two pcaps are returned when we look at 4 and 5 + // test with empty query filter + List results = + job.query(new Path(outDir.getAbsolutePath()) + , new Path(queryDir.getAbsolutePath()) + , getTimestamp(4, pcapEntries) + , getTimestamp(5, pcapEntries) + , "" + , new Configuration() + , FileSystem.get(new Configuration()) + , new QueryPcapFilter.Configurator() ); Assert.assertEquals(results.size(), 2); } @@ -289,6 +310,22 @@ public Void getResult() { }} , new Configuration() , FileSystem.get(new Configuration()) + , new FixedPcapFilter.Configurator() + ); + Assert.assertEquals(results.size(), 0); + } + { + // ensure that none get returned since that destination IP address isn't in the dataset + // test with query filter + List results = + job.query(new Path(outDir.getAbsolutePath()) + , new Path(queryDir.getAbsolutePath()) + , getTimestamp(0, pcapEntries) + , getTimestamp(1, pcapEntries) + , "ip_dst_addr == '207.28.210.1'" + , new Configuration() + , FileSystem.get(new Configuration()) + , new QueryPcapFilter.Configurator() ); Assert.assertEquals(results.size(), 0); } @@ -304,6 +341,22 @@ public Void getResult() { }} , new Configuration() , FileSystem.get(new Configuration()) + , new FixedPcapFilter.Configurator() + ); + Assert.assertEquals(results.size(), 0); + } + { + //same with protocol as before with the destination addr + //test with query filter + List results = + job.query(new Path(outDir.getAbsolutePath()) + , new Path(queryDir.getAbsolutePath()) + , getTimestamp(0, pcapEntries) + , getTimestamp(1, pcapEntries) + , "protocol == 'foo'" + , new Configuration() + , FileSystem.get(new Configuration()) + , new QueryPcapFilter.Configurator() ); Assert.assertEquals(results.size(), 0); } @@ -317,6 +370,22 @@ public Void getResult() { , new EnumMap<>(Constants.Fields.class) , new Configuration() , FileSystem.get(new Configuration()) + , new FixedPcapFilter.Configurator() + ); + Assert.assertEquals(results.size(), pcapEntries.size()); + } + { + //make sure I get them all. + //with query filter + List results = + job.query(new Path(outDir.getAbsolutePath()) + , new Path(queryDir.getAbsolutePath()) + , getTimestamp(0, pcapEntries) + , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 + , "" + , new Configuration() + , FileSystem.get(new Configuration()) + , new QueryPcapFilter.Configurator() ); Assert.assertEquals(results.size(), pcapEntries.size()); } @@ -331,6 +400,34 @@ public Void getResult() { }} , new Configuration() , FileSystem.get(new Configuration()) + , new FixedPcapFilter.Configurator() + ); + Assert.assertTrue(results.size() > 0); + Assert.assertEquals(results.size() + , Iterables.size(filterPcaps(pcapEntries, new Predicate() { + @Override + public boolean apply(@Nullable JSONObject input) { + Object prt = input.get(Constants.Fields.DST_PORT.getName()); + return prt != null && prt.toString().equals("22"); + } + }, withHeaders) + ) + ); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PcapMerger.merge(baos, results); + Assert.assertTrue(baos.toByteArray().length > 0); + } + { + //test with query filter + List results = + job.query(new Path(outDir.getAbsolutePath()) + , new Path(queryDir.getAbsolutePath()) + , getTimestamp(0, pcapEntries) + , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 + , "ip_dst_port == '22'" + , new Configuration() + , FileSystem.get(new Configuration()) + , new QueryPcapFilter.Configurator() ); Assert.assertTrue(results.size() > 0); Assert.assertEquals(results.size() diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java new file mode 100644 index 0000000000..c7168aa8be --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.filter; + +import org.apache.metron.pcap.PacketInfo; + +import java.util.Map; +import java.util.function.Predicate; + +public interface PcapFilter extends Predicate { + void configure(Iterable> config); +} diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilterConfigurator.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilterConfigurator.java new file mode 100644 index 0000000000..43e79ce984 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilterConfigurator.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.filter; + +import org.apache.hadoop.conf.Configuration; + +public interface PcapFilterConfigurator { + public static final String PCAP_FILTER_NAME_CONF = "PCAP_FILTER_NAME"; + void addToConfig(T fields, Configuration conf); + String queryToString(T fields); +} diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java new file mode 100644 index 0000000000..90e39f680e --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.filter; + +import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; +import org.apache.metron.pcap.filter.query.QueryPcapFilter; + +import java.util.function.Function; + +public enum PcapFilters { + FIXED(x -> new FixedPcapFilter()), + QUERY(x -> new QueryPcapFilter()); + + Function filter; + + PcapFilters(Function filter) { + this.filter = filter; + } + + public PcapFilter create() { + // Void arg + return filter.apply(null); + } + +} diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java new file mode 100644 index 0000000000..ace67815f4 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.filter.fixed; + +import com.google.common.base.Joiner; +import org.apache.hadoop.conf.Configuration; +import org.apache.metron.common.Constants; +import org.apache.metron.common.query.VariableResolver; +import org.apache.metron.pcap.PacketInfo; +import org.apache.metron.pcap.PcapHelper; +import org.apache.metron.pcap.filter.PcapFilter; +import org.apache.metron.pcap.filter.PcapFilterConfigurator; +import org.apache.metron.pcap.filter.PcapFilters; +import org.apache.metron.pcap.filter.query.PcapFieldResolver; + +import java.util.EnumMap; +import java.util.Map; + + +public class FixedPcapFilter implements PcapFilter { + + public static class Configurator implements PcapFilterConfigurator> { + @Override + public void addToConfig(EnumMap fields, Configuration conf) { + for (Map.Entry kv : fields.entrySet()) { + conf.set(kv.getKey().getName(), kv.getValue()); + } + conf.set(PCAP_FILTER_NAME_CONF, PcapFilters.FIXED.name()); + } + + @Override + public String queryToString(EnumMap fields) { + return Joiner.on("_").join(fields.values()); + } + } + + private String srcAddr; + private Integer srcPort; + private String dstAddr; + private Integer dstPort; + private String protocol; + private boolean includesReverseTraffic = false; + + @Override + public void configure(Iterable> config) { + for (Map.Entry kv : config) { + if (kv.getKey().equals(Constants.Fields.DST_ADDR.getName())) { + this.dstAddr = kv.getValue(); + } + if (kv.getKey().equals(Constants.Fields.SRC_ADDR.getName())) { + this.srcAddr = kv.getValue(); + } + if (kv.getKey().equals(Constants.Fields.DST_PORT.getName())) { + this.dstPort = Integer.parseInt(kv.getValue()); + } + if (kv.getKey().equals(Constants.Fields.SRC_PORT.getName())) { + this.srcPort = Integer.parseInt(kv.getValue()); + } + if (kv.getKey().equals(Constants.Fields.PROTOCOL.getName())) { + this.protocol = kv.getValue(); + } + if (kv.getKey().equals(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())) { + this.includesReverseTraffic = Boolean.parseBoolean(kv.getValue()); + } + } + } + + + @Override + public boolean test(PacketInfo pi) { + VariableResolver resolver = new PcapFieldResolver(packetToFields(pi)); + String srcAddrIn = resolver.resolve(Constants.Fields.SRC_ADDR.getName()); + String srcPortIn = resolver.resolve(Constants.Fields.SRC_PORT.getName()); + String dstAddrIn = resolver.resolve(Constants.Fields.DST_ADDR.getName()); + String dstPortIn = resolver.resolve(Constants.Fields.DST_PORT.getName()); + String protocolIn = resolver.resolve(Constants.Fields.PROTOCOL.getName()); + + if (areMatch(protocol, protocolIn)) { + if (matchesSourceAndDestination(srcAddrIn, srcPortIn, dstAddrIn, dstPortIn)) { + return true; + } else if (includesReverseTraffic) { + return matchesReverseSourceAndDestination(srcAddrIn, srcPortIn, dstAddrIn, dstPortIn); + } + } + return false; + } + + private boolean areMatch(Integer filter, String input) { + if (filter != null) { + return areMatch(filter.toString(), input); + } else { + return true; + } + } + + private boolean areMatch(String filter, String input) { + if (filter != null) { + return input != null && input.equals(filter); + } else { + return true; + } + } + + protected EnumMap packetToFields(PacketInfo pi) { + return PcapHelper.packetToFields(pi); + } + + private boolean matchesSourceAndDestination(String srcAddrComp, + String srcPortComp, + String dstAddrComp, + String dstPortComp) { + boolean isMatch = true; + isMatch &= areMatch(this.srcAddr, srcAddrComp); + isMatch &= areMatch(this.srcPort, srcPortComp); + isMatch &= areMatch(this.dstAddr, dstAddrComp); + isMatch &= areMatch(this.dstPort, dstPortComp); + return isMatch; + } + + private boolean matchesReverseSourceAndDestination(String srcAddr, + String srcPort, + String dstAddr, + String dstPort) { + return matchesSourceAndDestination(dstAddr, dstPort, srcAddr, srcPort); + } + +} diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/PcapFieldResolver.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/PcapFieldResolver.java new file mode 100644 index 0000000000..cd0e23ea66 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/PcapFieldResolver.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.filter.query; + +import org.apache.metron.common.Constants; +import org.apache.metron.common.query.VariableResolver; + +import java.util.EnumMap; + +public class PcapFieldResolver implements VariableResolver { + EnumMap fieldsMap = null; + + public PcapFieldResolver(EnumMap fieldsMap) { + this.fieldsMap = fieldsMap; + } + + @Override + public String resolve(String variable) { + Object obj = fieldsMap.get(Constants.Fields.fromString(variable)); + if (obj != null) { + return obj.toString(); + } else { + return null; + } + } +} diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java new file mode 100644 index 0000000000..5c7ede2d21 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.filter.query; + +import org.apache.hadoop.conf.Configuration; +import org.apache.metron.common.Constants; +import org.apache.metron.common.query.PredicateProcessor; +import org.apache.metron.common.query.VariableResolver; +import org.apache.metron.pcap.PacketInfo; +import org.apache.metron.pcap.PcapHelper; +import org.apache.metron.pcap.filter.PcapFilter; +import org.apache.metron.pcap.filter.PcapFilterConfigurator; +import org.apache.metron.pcap.filter.PcapFilters; + +import java.util.EnumMap; +import java.util.Map; + +public class QueryPcapFilter implements PcapFilter { + public static final String QUERY_STR_CONFIG = "mql"; + + public static class Configurator implements PcapFilterConfigurator { + @Override + public void addToConfig(String query, Configuration conf) { + conf.set(QUERY_STR_CONFIG, query); + conf.set(PCAP_FILTER_NAME_CONF, PcapFilters.QUERY.name()); + } + + @Override + public String queryToString(String fields) { + return fields.replaceAll("\\w", "_") + .replace(".", "-"); + } + } + + private String queryString = null; + private PredicateProcessor predicateProcessor = new PredicateProcessor(); + + @Override + public void configure(Iterable> config) { + for (Map.Entry entry : config) { + if (entry.getKey().equals(QUERY_STR_CONFIG)) { + queryString = entry.getValue(); + } + } + predicateProcessor.validate(queryString); + } + + @Override + public boolean test(PacketInfo input) { + EnumMap fields = packetToFields(input); + VariableResolver resolver = new PcapFieldResolver(fields); + return predicateProcessor.parse(queryString, resolver); + } + + protected EnumMap packetToFields(PacketInfo pi) { + return PcapHelper.packetToFields(pi); + } +} diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java deleted file mode 100644 index 2952a0a8c0..0000000000 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.pcap.mr; - -import com.google.common.base.Predicate; -import org.apache.metron.common.Constants; -import org.apache.metron.pcap.PacketInfo; -import org.apache.metron.pcap.PcapHelper; - -import javax.annotation.Nullable; -import java.util.EnumMap; -import java.util.Map; - - -public class PcapFilter implements Predicate { - - private String srcAddr; - private Integer srcPort; - private String dstAddr; - private Integer dstPort; - private String protocol; - private boolean includesReverseTraffic = false; - - - public PcapFilter(Iterable> config) { - for(Map.Entry kv : config) { - if(kv.getKey().equals(Constants.Fields.DST_ADDR.getName())) { - this.dstAddr = kv.getValue(); - } - if(kv.getKey().equals(Constants.Fields.SRC_ADDR.getName())) { - this.srcAddr = kv.getValue(); - } - if(kv.getKey().equals(Constants.Fields.DST_PORT.getName())) { - this.dstPort = Integer.parseInt(kv.getValue()); - } - if(kv.getKey().equals(Constants.Fields.SRC_PORT.getName())) { - this.srcPort = Integer.parseInt(kv.getValue()); - } - if(kv.getKey().equals(Constants.Fields.PROTOCOL.getName())) { - this.protocol= kv.getValue(); - } - if(kv.getKey().equals(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())) { - this.includesReverseTraffic = Boolean.parseBoolean(kv.getValue()); - } - } - } - - private boolean matchSourceAndDestination(Object srcAddrObj - , Object srcPortObj - , Object dstAddrObj - , Object dstPortObj - ) - { - boolean isMatch = true; - if(srcAddr != null ) { - Object o = srcAddrObj; - isMatch &= o != null && o instanceof String && ((String)o).equals(srcAddr); - } - if(isMatch && srcPort != null ) { - Object o = srcPortObj; - isMatch &= o != null && o.toString().equals(srcPort.toString()); - } - if(isMatch && dstAddr != null ) { - Object o = dstAddrObj; - isMatch &= o != null && o instanceof String && ((String)o).equals(dstAddr); - } - if(isMatch && dstPort != null) { - Object o = dstPortObj; - isMatch &= o != null && o.toString().equals(dstPort.toString()); - } - return isMatch; - } - - - protected EnumMap packetToFields(PacketInfo pi) { - return PcapHelper.packetToFields(pi); - } - - @Override - public boolean apply(@Nullable PacketInfo pi ) { - boolean isMatch = true; - EnumMap input= packetToFields(pi); - Object srcAddrObj = input.get(Constants.Fields.SRC_ADDR); - Object srcPortObj = input.get(Constants.Fields.SRC_PORT); - Object dstAddrObj = input.get(Constants.Fields.DST_ADDR); - Object dstPortObj = input.get(Constants.Fields.DST_PORT); - Object protocolObj = input.get(Constants.Fields.PROTOCOL); - - //first we ensure the protocol matches if you pass one in - if(isMatch && protocol != null ) { - Object o = protocolObj; - isMatch &= o != null && o.toString().equals(protocol); - } - if(isMatch) { - //if we're still a match, then we try to match the source and destination - isMatch &= matchSourceAndDestination(srcAddrObj, srcPortObj, dstAddrObj, dstPortObj); - if (!isMatch && includesReverseTraffic) { - isMatch = true; - //then we have to try the other direction if that the forward direction isn't a match - isMatch &= matchSourceAndDestination(dstAddrObj, dstPortObj, srcAddrObj, srcPortObj); - } - } - return isMatch; - } -} diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index 0dc6dc9188..a9e569a2a3 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -18,12 +18,12 @@ package org.apache.metron.pcap.mr; -import com.google.common.base.Function; import com.google.common.base.Joiner; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; @@ -33,28 +33,31 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.log4j.Logger; -import org.apache.metron.common.Constants; import org.apache.metron.pcap.PcapHelper; +import org.apache.metron.pcap.filter.PcapFilter; +import org.apache.metron.pcap.filter.PcapFilterConfigurator; +import org.apache.metron.pcap.filter.PcapFilters; -import javax.annotation.Nullable; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.*; +import java.util.stream.Collectors; public class PcapJob { private static final Logger LOG = Logger.getLogger(PcapJob.class); - public static class PcapMapper extends Mapper { public static final String START_TS_CONF = "start_ts"; public static final String END_TS_CONF = "end_ts"; PcapFilter filter; long start; long end; + @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); - filter = new PcapFilter(context.getConfiguration()); + filter = PcapFilters.valueOf(context.getConfiguration().get(PcapFilterConfigurator.PCAP_FILTER_NAME_CONF)).create(); + filter.configure(context.getConfiguration()); start = Long.parseUnsignedLong(context.getConfiguration().get(START_TS_CONF)); end = Long.parseUnsignedLong(context.getConfiguration().get(END_TS_CONF)); } @@ -62,7 +65,7 @@ protected void setup(Context context) throws IOException, InterruptedException { @Override protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { if(Long.compareUnsigned(key.get() ,start) >= 0 && Long.compareUnsigned(key.get(), end) <= 0) { - boolean send = Iterables.size(Iterables.filter(PcapHelper.toPacketInfo(value.copyBytes()), filter)) > 0; + boolean send = PcapHelper.toPacketInfo(value.copyBytes()).stream().filter(filter).collect(Collectors.toList()).size() > 0; if (send) { context.write(key, value); } @@ -154,26 +157,23 @@ private List readResults(Path outputPath, Configuration config, FileSyst return ret; } - private static String queryToString(EnumMap fields) { - return Joiner.on("_").join(fields.values()); - } - - public List query(Path basePath + public List query(Path basePath , Path baseOutputPath , long beginNS , long endNS - , EnumMap fields + , T fields , Configuration conf , FileSystem fs + , PcapFilterConfigurator filterImpl ) throws IOException, ClassNotFoundException, InterruptedException { - String fileName = Joiner.on("_").join(beginNS, endNS, queryToString(fields), UUID.randomUUID().toString()); + String fileName = Joiner.on("_").join(beginNS, endNS, filterImpl.queryToString(fields), UUID.randomUUID().toString()); if(LOG.isDebugEnabled()) { DateFormat format = SimpleDateFormat.getDateTimeInstance( SimpleDateFormat.LONG , SimpleDateFormat.LONG ); String from = format.format(new Date(Long.divideUnsigned(beginNS, 1000000))); String to = format.format(new Date(Long.divideUnsigned(endNS, 1000000))); - LOG.debug("Executing query " + queryToString(fields) + " on timerange " + from + " to " + to); + LOG.debug("Executing query " + filterImpl.queryToString(fields) + " on timerange " + from + " to " + to); } Path outputPath = new Path(baseOutputPath, fileName); Job job = createJob( basePath @@ -183,6 +183,7 @@ public List query(Path basePath , fields , conf , fs + , filterImpl ); boolean completed = job.waitForCompletion(true); if(completed) { @@ -194,24 +195,21 @@ public List query(Path basePath } - public static void addToConfig(EnumMap fields, Configuration conf) { - for(Map.Entry kv : fields.entrySet()) { - conf.set(kv.getKey().getName(), kv.getValue()); - } - } - public Job createJob( Path basePath + + public Job createJob( Path basePath , Path outputPath , long beginNS , long endNS - , EnumMap fields + , T fields , Configuration conf , FileSystem fs + , PcapFilterConfigurator filterImpl ) throws IOException { conf.set(PcapMapper.START_TS_CONF, Long.toUnsignedString(beginNS)); conf.set(PcapMapper.END_TS_CONF, Long.toUnsignedString(endNS)); - addToConfig(fields, conf); + filterImpl.addToConfig(fields, conf); Job job = new Job(conf); job.setJarByClass(PcapJob.class); job.setMapperClass(PcapJob.PcapMapper.class); From 98915ddd5c89e8b7ce8984c3c9063000d1916b72 Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Thu, 12 May 2016 19:56:10 -0400 Subject: [PATCH 2/3] METRON-155 Add tests for Configurator queryToString. Fix javadoc in REST API --- .../pcapservice/PcapReceiverImplRestEasy.java | 45 +++++++----- .../common/query/PredicateProcessor.java | 2 +- .../metron/common/query/QueryParserTest.java | 6 +- .../filter/{query => }/PcapFieldResolver.java | 2 +- .../pcap/filter/fixed/FixedPcapFilter.java | 10 +-- .../pcap/filter/query/QueryPcapFilter.java | 14 ++-- .../filter/fixed/FixedPcapFilterTest.java | 69 +++++++++++++++++++ .../filter/query/QueryPcapFilterTest.java | 58 ++++++++++++++++ 8 files changed, 171 insertions(+), 35 deletions(-) rename metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/{query => }/PcapFieldResolver.java (96%) create mode 100644 metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java create mode 100644 metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java index aa64aff9f1..95103184b2 100644 --- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java +++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java @@ -97,15 +97,17 @@ private static boolean isValidPort(String port) { } return false; } - /* - * (non-Javadoc) - * - * @see - * com.cisco.opensoc.hbase.client.IPcapReceiver#getPcapsByIdentifiers(java.lang - * .String, java.lang.String, java.lang.String, java.lang.String, - * java.lang.String, long, long, boolean, - * javax.servlet.http.HttpServletResponse) - */ + + /** + * Enable filtering PCAP results by query filter string and start/end packet TS + * + * @param query Filter results based on this query + * @param startTime Only return packets originating after this start time + * @param endTime Only return packets originating before this end time + * @param servlet_response + * @return REST response + * @throws IOException + */ @GET @Path("/pcapGetter/getPcapsByQuery") public Response getPcapsByIdentifiers( @@ -155,18 +157,23 @@ public Response getPcapsByIdentifiers( return Response .ok(response.getPcaps(), MediaType.APPLICATION_OCTET_STREAM) .status(200).build(); - } - /* - * (non-Javadoc) - * - * @see - * com.cisco.opensoc.hbase.client.IPcapReceiver#getPcapsByIdentifiers(java.lang - * .String, java.lang.String, java.lang.String, java.lang.String, - * java.lang.String, long, long, boolean, - * javax.servlet.http.HttpServletResponse) - */ + /** + * Enable filtering PCAP results by fixed properties and start/end packet TS + * + * @param srcIp filter value + * @param dstIp filter value + * @param protocol filter value + * @param srcPort filter value + * @param dstPort filter value + * @param startTime filter value + * @param endTime filter value + * @param includeReverseTraffic Indicates if filter should check swapped src/dest addresses and IPs + * @param servlet_response + * @return REST response + * @throws IOException + */ @GET @Path("/pcapGetter/getPcapsByIdentifiers") public Response getPcapsByIdentifiers( diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java index 3e3334155d..26e4da8300 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java @@ -29,7 +29,7 @@ public class PredicateProcessor { public boolean parse(String rule, VariableResolver resolver) { - if (isEmpty(rule)) { + if (rule == null || isEmpty(rule.trim())) { return true; } ANTLRInputStream input = new ANTLRInputStream(rule); diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java index c132464eeb..46600026c2 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java @@ -31,14 +31,14 @@ public void testValidation() throws Exception { PredicateProcessor processor = new PredicateProcessor(); try { processor.validate("'foo'"); - Assert.fail("Invalid rule found to be valid."); + Assert.fail("Invalid rule found to be valid - lone value."); } catch(ParseException e) { } try { processor.validate("enrichedField1 == 'enrichedValue1"); - Assert.fail("Invalid rule found to be valid."); + Assert.fail("Invalid rule found to be valid - unclosed single quotes."); } catch(ParseException e) { @@ -67,7 +67,9 @@ public void testSimpleOps() throws Exception { Assert.assertTrue(run("foo== foo", v -> variableMap.get(v))); Assert.assertTrue(run("empty== ''", v -> variableMap.get(v))); Assert.assertTrue(run("spaced == 'metron is great'", v -> variableMap.get(v))); + Assert.assertTrue(run(null, v -> variableMap.get(v))); Assert.assertTrue(run("", v -> variableMap.get(v))); + Assert.assertTrue(run(" ", v -> variableMap.get(v))); } @Test diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/PcapFieldResolver.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java similarity index 96% rename from metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/PcapFieldResolver.java rename to metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java index cd0e23ea66..50537e1b42 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/PcapFieldResolver.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.metron.pcap.filter.query; +package org.apache.metron.pcap.filter; import org.apache.metron.common.Constants; import org.apache.metron.common.query.VariableResolver; diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java index ace67815f4..ea46cc3384 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java @@ -27,7 +27,7 @@ import org.apache.metron.pcap.filter.PcapFilter; import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.filter.PcapFilters; -import org.apache.metron.pcap.filter.query.PcapFieldResolver; +import org.apache.metron.pcap.filter.PcapFieldResolver; import java.util.EnumMap; import java.util.Map; @@ -46,7 +46,7 @@ public void addToConfig(EnumMap fields, Configuration @Override public String queryToString(EnumMap fields) { - return Joiner.on("_").join(fields.values()); + return (fields == null ? "" : Joiner.on("_").join(fields.values())); } } @@ -102,11 +102,7 @@ public boolean test(PacketInfo pi) { } private boolean areMatch(Integer filter, String input) { - if (filter != null) { - return areMatch(filter.toString(), input); - } else { - return true; - } + return filter == null || areMatch(filter.toString(), input); } private boolean areMatch(String filter, String input) { diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java index 5c7ede2d21..8355ce61bd 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -24,6 +24,7 @@ import org.apache.metron.common.query.VariableResolver; import org.apache.metron.pcap.PacketInfo; import org.apache.metron.pcap.PcapHelper; +import org.apache.metron.pcap.filter.PcapFieldResolver; import org.apache.metron.pcap.filter.PcapFilter; import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.filter.PcapFilters; @@ -43,8 +44,11 @@ public void addToConfig(String query, Configuration conf) { @Override public String queryToString(String fields) { - return fields.replaceAll("\\w", "_") - .replace(".", "-"); + return (fields == null ? "" : + fields.trim().replaceAll("\\s", "_") + .replace(".", "-") + .replace("'", "") + ); } } diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java new file mode 100644 index 0000000000..b75b9c8991 --- /dev/null +++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.filter.fixed; + +import org.apache.metron.common.Constants; +import org.junit.Assert; +import org.junit.Test; + +import java.util.EnumMap; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class FixedPcapFilterTest { + + @Test + public void string_representation_of_query_gets_formatted() throws Exception { + final EnumMap fields = new EnumMap(Constants.Fields.class) {{ + put(Constants.Fields.SRC_ADDR, "src_ip"); + put(Constants.Fields.SRC_PORT, "0"); + put(Constants.Fields.DST_ADDR, "dst_ip"); + put(Constants.Fields.DST_PORT, "1"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false"); + }}; + String actual = new FixedPcapFilter.Configurator().queryToString(fields); + String expected = "src_ip_0_dst_ip_1_false"; + Assert.assertThat("string representation did not match", actual, equalTo(expected)); + } + + @Test + public void string_representation_of_empty_fields_empty() throws Exception { + { + final EnumMap fields = new EnumMap(Constants.Fields.class); + String actual = new FixedPcapFilter.Configurator().queryToString(fields); + String expected = ""; + Assert.assertThat("string representation did not match", actual, equalTo(expected)); + } + { + String actual = new FixedPcapFilter.Configurator().queryToString(null); + String expected = ""; + Assert.assertThat("string representation did not match", actual, equalTo(expected)); + } + { + final EnumMap fields = new EnumMap(Constants.Fields.class) {{ + put(Constants.Fields.SRC_ADDR, ""); + put(Constants.Fields.SRC_PORT, ""); + }}; + String actual = new FixedPcapFilter.Configurator().queryToString(fields); + String expected = "_"; + Assert.assertThat("string representation did not match", actual, equalTo(expected)); + } + } + +} diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java new file mode 100644 index 0000000000..061066e114 --- /dev/null +++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.filter.query; + +import org.junit.Assert; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class QueryPcapFilterTest { + + @Test + public void string_representation_of_query_gets_formatted() throws Exception { + String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'"; + String actual = new QueryPcapFilter.Configurator().queryToString(query); + String expected = "ip_src_addr_==_srcIp_and_ip_src_port_==_80_and_ip_dst_addr_==_dstIp_and_ip_dst_port_==_100_and_protocol_==_protocol"; + Assert.assertThat("string representation did not match", actual, equalTo(expected)); + } + + @Test + public void string_representation_of_empty_query_empty() throws Exception { + { + String query = ""; + String actual = new QueryPcapFilter.Configurator().queryToString(query); + String expected = ""; + Assert.assertThat("string representation did not match", actual, equalTo(expected)); + } + { + String query = " "; + String actual = new QueryPcapFilter.Configurator().queryToString(query); + String expected = ""; + Assert.assertThat("string representation did not match", actual, equalTo(expected)); + } + { + String query = null; + String actual = new QueryPcapFilter.Configurator().queryToString(query); + String expected = ""; + Assert.assertThat("string representation did not match", actual, equalTo(expected)); + } + } + +} From 68707854c29091e4d2e0684fab976092abb1ce41 Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Fri, 13 May 2016 13:58:35 -0400 Subject: [PATCH 3/3] METRON-155 Add Creator interface to metron-common. Document PcapJob mapper and enhance filtering to use short-circuit Stream function. --- .../org/apache/metron/common/Creator.java | 22 ++++++++++++ .../metron/pcap/filter/PcapFilters.java | 32 ++++++++++------- .../pcap/filter/query/QueryPcapFilter.java | 6 ++-- .../org/apache/metron/pcap/mr/PcapJob.java | 15 ++++++-- .../metron/pcap/filter/PcapFiltersTest.java | 35 +++++++++++++++++++ 5 files changed, 92 insertions(+), 18 deletions(-) create mode 100644 metron-platform/metron-common/src/main/java/org/apache/metron/common/Creator.java create mode 100644 metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/PcapFiltersTest.java diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Creator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Creator.java new file mode 100644 index 0000000000..b813dc5750 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Creator.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common; + +public interface Creator { + T create(); +} diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java index 90e39f680e..7d9e285fb9 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java @@ -18,24 +18,32 @@ package org.apache.metron.pcap.filter; +import org.apache.metron.common.Creator; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.pcap.filter.query.QueryPcapFilter; -import java.util.function.Function; - -public enum PcapFilters { - FIXED(x -> new FixedPcapFilter()), - QUERY(x -> new QueryPcapFilter()); - - Function filter; - - PcapFilters(Function filter) { - this.filter = filter; +public enum PcapFilters implements Creator { + FIXED(new Creator() { + @Override + public PcapFilter create() { + return new FixedPcapFilter(); + } + }), + QUERY(new Creator() { + @Override + public PcapFilter create() { + return new QueryPcapFilter(); + } + }); + + Creator creator; + + PcapFilters(Creator creator) { + this.creator = creator; } public PcapFilter create() { - // Void arg - return filter.apply(null); + return creator.create(); } } diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java index 8355ce61bd..e97ed82b4d 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index a9e569a2a3..3543b1d81d 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.log4j.Logger; +import org.apache.metron.pcap.PacketInfo; import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.filter.PcapFilter; import org.apache.metron.pcap.filter.PcapFilterConfigurator; @@ -42,7 +43,7 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.*; -import java.util.stream.Collectors; +import java.util.stream.Stream; public class PcapJob { private static final Logger LOG = Logger.getLogger(PcapJob.class); @@ -64,13 +65,21 @@ protected void setup(Context context) throws IOException, InterruptedException { @Override protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { - if(Long.compareUnsigned(key.get() ,start) >= 0 && Long.compareUnsigned(key.get(), end) <= 0) { - boolean send = PcapHelper.toPacketInfo(value.copyBytes()).stream().filter(filter).collect(Collectors.toList()).size() > 0; + if (Long.compareUnsigned(key.get(), start) >= 0 && Long.compareUnsigned(key.get(), end) <= 0) { + // It is assumed that the passed BytesWritable value is always a *single* PacketInfo object. Passing more than 1 + // object will result in the whole set being passed through if any pass the filter. We cannot serialize PacketInfo + // objects back to byte arrays, otherwise we could support more than one packet. + // Note: short-circuit findAny() func on stream + boolean send = filteredPacketInfo(value).findAny().isPresent(); if (send) { context.write(key, value); } } } + + private Stream filteredPacketInfo(BytesWritable value) throws IOException { + return PcapHelper.toPacketInfo(value.copyBytes()).stream().filter(filter); + } } public static class PcapReducer extends Reducer { diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/PcapFiltersTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/PcapFiltersTest.java new file mode 100644 index 0000000000..75871e9a6e --- /dev/null +++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/PcapFiltersTest.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.pcap.filter; + +import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; +import org.apache.metron.pcap.filter.query.QueryPcapFilter; +import org.junit.Assert; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; + +public class PcapFiltersTest { + + @Test + public void creates_pcap_filters() throws Exception { + Assert.assertThat("filter type should be Fixed", PcapFilters.FIXED.create(), instanceOf(FixedPcapFilter.class)); + Assert.assertThat("filter type should be Query", PcapFilters.QUERY.create(), instanceOf(QueryPcapFilter.class)); + } + +}