diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md index b76712bca5..d32e9fd37b 100644 --- a/metron-interface/metron-rest/README.md +++ b/metron-interface/metron-rest/README.md @@ -199,10 +199,11 @@ Request and Response objects are JSON formatted. The JSON schemas are available | [ `GET /api/v1/kafka/topic/{name}`](#get-apiv1kafkatopicname)| | [ `DELETE /api/v1/kafka/topic/{name}`](#delete-apiv1kafkatopicname)| | [ `GET /api/v1/kafka/topic/{name}/sample`](#get-apiv1kafkatopicnamesample)| -| [ `GET /api/v1/search/search`](#get-apiv1searchsearch)| +| [ `POST /api/v1/search/search`](#get-apiv1searchsearch)| +| [ `POST /api/v1/search/group`](#get-apiv1searchgroup)| | [ `GET /api/v1/search/findOne`](#get-apiv1searchfindone)| -| [ `GET /api/v1/search/search`](#get-apiv1searchcolumnmetadata)| -| [ `GET /api/v1/search/search`](#get-apiv1searchcolumnmetadatacommon)| +| [ `GET /api/v1/search/column/metadata`](#get-apiv1searchcolumnmetadata)| +| [ `GET /api/v1/search/column/metadata/common`](#get-apiv1searchcolumnmetadatacommon)| | [ `GET /api/v1/sensor/enrichment/config`](#get-apiv1sensorenrichmentconfig)| | [ `GET /api/v1/sensor/enrichment/config/list/available/enrichments`](#get-apiv1sensorenrichmentconfiglistavailableenrichments)| | [ `GET /api/v1/sensor/enrichment/config/list/available/threat/triage/aggregators`](#get-apiv1sensorenrichmentconfiglistavailablethreattriageaggregators)| @@ -353,6 +354,24 @@ Request and Response objects are JSON formatted. The JSON schemas are available * 200 - Returns sample message * 404 - Either Kafka topic is missing or contains no messages +### `POST /api/v1/search/search` + * Description: Searches the indexing store + * Input: + * searchRequest - Search request + * Returns: + * 200 - Search response + +### `POST /api/v1/search/group` + * Description: Searches the indexing store and returns field groups. Groups are hierarchical and nested in the order the fields appear in the 'groups' request parameter. The default sorting within groups is by count descending. A groupOrder type of count will sort based on then number of documents in a group while a groupType of term will sort by the groupBy term. + * Input: + * groupRequest - Group request + * indices - list of indices to search + * query - lucene query + * scoreField - field used to compute a total score for each group + * groups - List of groups (field name and sort order) + * Returns: + * 200 - Group response + ### `GET /api/v1/search/findOne` * Description: Returns latest document for a guid and sensor * Input: @@ -369,13 +388,6 @@ Request and Response objects are JSON formatted. The JSON schemas are available * Returns: * 200 - Document representing the output * 404 - Document with UUID and sensor type not found - -### `GET /api/v1/search/search` - * Description: Searches the indexing store - * Input: - * searchRequest - Search request - * Returns: - * 200 - Search results ### `GET /api/v1/search/column/metadata` * Description: Get column metadata for each index in the list of indicies diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java index 11310d41f8..4cf23bbc4a 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java @@ -56,6 +56,7 @@ public class MetronRestConstants { public static final String KERBEROS_KEYTAB_SPRING_PROPERTY = "kerberos.keytab"; public static final String SEARCH_MAX_RESULTS = "search.max.results"; + public static final String SEARCH_MAX_GROUPS = "search.max.groups"; public static final String INDEX_DAO_IMPL = "index.dao.impl"; public static final String INDEX_HBASE_TABLE_PROVIDER_IMPL = "index.hbase.provider"; } diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java index 63851168a3..b6ac5e775e 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java @@ -17,6 +17,8 @@ */ package org.apache.metron.rest.config; +import static org.apache.metron.rest.MetronRestConstants.INDEX_DAO_IMPL; + import org.apache.metron.hbase.HTableProvider; import org.apache.metron.hbase.TableProvider; import org.apache.metron.indexing.dao.AccessConfig; @@ -28,14 +30,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; import org.springframework.core.env.Environment; -import java.lang.reflect.InvocationTargetException; - -import static org.apache.metron.rest.MetronRestConstants.INDEX_DAO_IMPL; -import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; - @Configuration public class IndexConfig { @@ -55,9 +51,11 @@ public IndexDao indexDao() { try { String hbaseProviderImpl = environment.getProperty(MetronRestConstants.INDEX_HBASE_TABLE_PROVIDER_IMPL, String.class, null); String indexDaoImpl = environment.getProperty(MetronRestConstants.INDEX_DAO_IMPL, String.class, null); - int searchMaxResults = environment.getProperty(MetronRestConstants.SEARCH_MAX_RESULTS, Integer.class, -1); + int searchMaxResults = environment.getProperty(MetronRestConstants.SEARCH_MAX_RESULTS, Integer.class, 1000); + int searchMaxGroups = environment.getProperty(MetronRestConstants.SEARCH_MAX_GROUPS, Integer.class, 1000); AccessConfig config = new AccessConfig(); config.setMaxSearchResults(searchMaxResults); + config.setMaxSearchGroups(searchMaxGroups); config.setGlobalConfigSupplier(() -> { try { return globalConfigService.get(); diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java index dea628c285..e21541350f 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java @@ -21,7 +21,8 @@ import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import org.apache.metron.indexing.dao.search.GetRequest; -import org.apache.metron.indexing.dao.update.Document; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.rest.RestException; import org.apache.metron.indexing.dao.search.SearchRequest; @@ -38,7 +39,6 @@ import java.util.Map; import java.util.Optional; import java.util.List; -import java.util.Map; @RestController @RequestMapping("/api/v1/search") @@ -54,6 +54,15 @@ ResponseEntity search(final @ApiParam(name = "searchRequest", va return new ResponseEntity<>(searchService.search(searchRequest), HttpStatus.OK); } + @ApiOperation(value = "Searches the indexing store and returns field groups. " + + "Groups are hierarchical and nested in the order the fields appear in the 'groups' request parameter. " + + "The default sorting within groups is by count descending.") + @ApiResponse(message = "Group response", code = 200) + @RequestMapping(value = "/group", method = RequestMethod.POST) + ResponseEntity group(final @ApiParam(name = "groupRequest", value = "Group request", required = true) @RequestBody GroupRequest groupRequest) throws RestException { + return new ResponseEntity<>(searchService.group(groupRequest), HttpStatus.OK); + } + @ApiOperation(value = "Returns latest document for a guid and sensor") @ApiResponse(message = "Document representing the output", code = 200) @RequestMapping(value = "/findOne", method = RequestMethod.POST) diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java index ea0ae812e9..589976534d 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java @@ -18,7 +18,8 @@ package org.apache.metron.rest.service; import org.apache.metron.indexing.dao.search.GetRequest; -import org.apache.metron.indexing.dao.update.Document; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.rest.RestException; import org.apache.metron.indexing.dao.search.SearchRequest; @@ -27,11 +28,11 @@ import java.util.Map; import java.util.Optional; import java.util.List; -import java.util.Map; public interface SearchService { SearchResponse search(SearchRequest searchRequest) throws RestException; + GroupResponse group(GroupRequest groupRequest) throws RestException; Optional> getLatest(GetRequest request) throws RestException; Map> getColumnMetadata(List indices) throws RestException; Map getCommonColumnMetadata(List indices) throws RestException; diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java index bdf6037bb6..d865e0e66d 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java @@ -19,10 +19,11 @@ import org.apache.metron.indexing.dao.IndexDao; import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; -import org.apache.metron.indexing.dao.update.Document; import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.SearchService; @@ -34,7 +35,6 @@ import java.util.Map; import java.util.Optional; import java.util.List; -import java.util.Map; @Service public class SearchServiceImpl implements SearchService { @@ -57,6 +57,16 @@ public SearchResponse search(SearchRequest searchRequest) throws RestException { } } + @Override + public GroupResponse group(GroupRequest groupRequest) throws RestException { + try { + return dao.group(groupRequest); + } + catch(InvalidSearchException ise) { + throw new RestException(ise.getMessage(), ise); + } + } + @Override public Optional> getLatest(GetRequest request) throws RestException { try { diff --git a/metron-interface/metron-rest/src/main/resources/application.yml b/metron-interface/metron-rest/src/main/resources/application.yml index 4aff769c14..1c50e0b8c8 100644 --- a/metron-interface/metron-rest/src/main/resources/application.yml +++ b/metron-interface/metron-rest/src/main/resources/application.yml @@ -44,6 +44,7 @@ curator: search: max: results: 1000 + groups: 1000 index: dao: diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java index e75c356cee..645e5251ae 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java @@ -24,7 +24,6 @@ import org.json.simple.parser.ParseException; import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -44,7 +43,6 @@ import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf; import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic; import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; @@ -174,6 +172,22 @@ public void test() throws Exception { .andExpect(jsonPath("$.responseCode").value(500)) .andExpect(jsonPath("$.message").value("Search result size must be less than 100")); + this.mockMvc.perform(post(searchUrl + "/group").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(SearchIntegrationTest.groupByQuery)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.*", hasSize(2))) + .andExpect(jsonPath("$.groupedBy").value("is_alert")) + .andExpect(jsonPath("$.groupResults.*", hasSize(1))) + .andExpect(jsonPath("$.groupResults[0].*", hasSize(5))) + .andExpect(jsonPath("$.groupResults[0].key").value("is_alert_value")) + .andExpect(jsonPath("$.groupResults[0].total").value(10)) + .andExpect(jsonPath("$.groupResults[0].groupedBy").value("latitude")) + .andExpect(jsonPath("$.groupResults[0].groupResults.*", hasSize(1))) + .andExpect(jsonPath("$.groupResults[0].groupResults[0].*", hasSize(3))) + .andExpect(jsonPath("$.groupResults[0].groupResults[0].key").value("latitude_value")) + .andExpect(jsonPath("$.groupResults[0].groupResults[0].total").value(10)) + .andExpect(jsonPath("$.groupResults[0].groupResults[0].score").value(50)); + this.mockMvc.perform(post(searchUrl + "/column/metadata").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content("[\"bro\",\"snort\"]")) .andExpect(status().isOk()) .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index 01c113c5ab..0d7a76c36b 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -19,59 +19,59 @@ import com.google.common.base.Splitter; import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.metron.common.Constants; -import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; -import org.apache.metron.indexing.dao.update.Document; import org.apache.metron.indexing.dao.IndexDao; -import org.apache.metron.indexing.dao.search.*; +import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.Group; +import org.apache.metron.indexing.dao.search.GroupOrder; +import org.apache.metron.indexing.dao.search.GroupOrderType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; +import org.apache.metron.indexing.dao.search.GroupResult; +import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; -import org.elasticsearch.action.get.GetRequestBuilder; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.*; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.update.UpdateResponse; +import org.apache.metron.indexing.dao.search.SearchResult; +import org.apache.metron.indexing.dao.search.SortOrder; +import org.apache.metron.indexing.dao.update.Document; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.index.mapper.ip.IpFieldMapper; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryStringQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; -import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms; -import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; -import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order; import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.sort.*; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; -import java.io.IOException; -import java.util.Arrays; -import java.util.Date; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; public class ElasticsearchDao implements IndexDao { private transient TransportClient client; @@ -115,26 +115,17 @@ public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchEx .size(searchRequest.getSize()) .from(searchRequest.getFrom()) .query(new QueryStringQueryBuilder(searchRequest.getQuery())) - .trackScores(true); + searchRequest.getSort().forEach(sortField -> searchSourceBuilder.sort(sortField.getField(), getElasticsearchSortOrder(sortField.getSortOrder()))); Optional> fields = searchRequest.getFields(); if (fields.isPresent()) { searchSourceBuilder.fields(fields.get()); } else { searchSourceBuilder.fetchSource(true); } - for (SortField sortField : searchRequest.getSort()) { - FieldSortBuilder fieldSortBuilder = new FieldSortBuilder(sortField.getField()); - if (sortField.getSortOrder() == org.apache.metron.indexing.dao.search.SortOrder.DESC) { - fieldSortBuilder.order(org.elasticsearch.search.sort.SortOrder.DESC); - } else { - fieldSortBuilder.order(org.elasticsearch.search.sort.SortOrder.ASC); - } - searchSourceBuilder = searchSourceBuilder.sort(fieldSortBuilder); - } Optional> facetFields = searchRequest.getFacetFields(); if (facetFields.isPresent()) { - addFacetFields(searchSourceBuilder, facetFields.get()); + facetFields.get().forEach(field -> searchSourceBuilder.aggregation(new TermsBuilder(getFacentAggregationName(field)).field(field))); } String[] wildcardIndices = searchRequest.getIndices().stream().map(index -> String.format("%s*", index)).toArray(value -> new String[searchRequest.getIndices().size()]); org.elasticsearch.action.search.SearchResponse elasticsearchResponse; @@ -146,23 +137,8 @@ public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchEx } SearchResponse searchResponse = new SearchResponse(); searchResponse.setTotal(elasticsearchResponse.getHits().getTotalHits()); - searchResponse.setResults(Arrays.stream(elasticsearchResponse.getHits().getHits()).map(searchHit -> { - SearchResult searchResult = new SearchResult(); - searchResult.setId(searchHit.getId()); - Map source; - if (fields.isPresent()) { - source = new HashMap<>(); - searchHit.getFields().forEach((key, value) -> { - source.put(key, value.getValues().size() == 1 ? value.getValue() : value.getValues()); - }); - } else { - source = searchHit.getSource(); - } - searchResult.setSource(source); - searchResult.setScore(searchHit.getScore()); - searchResult.setIndex(searchHit.getIndex()); - return searchResult; - }).collect(Collectors.toList())); + searchResponse.setResults(Arrays.stream(elasticsearchResponse.getHits().getHits()).map(searchHit -> + getSearchResult(searchHit, fields.isPresent())).collect(Collectors.toList())); if (facetFields.isPresent()) { Map commonColumnMetadata; try { @@ -175,6 +151,37 @@ public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchEx return searchResponse; } + @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + if(client == null) { + throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); + } + if (groupRequest.getGroups() == null || groupRequest.getGroups().size() == 0) { + throw new InvalidSearchException("At least 1 group must be provided."); + } + final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(new QueryStringQueryBuilder(groupRequest.getQuery())); + searchSourceBuilder.aggregation(getGroupsTermBuilder(groupRequest, 0)); + String[] wildcardIndices = groupRequest.getIndices().stream().map(index -> String.format("%s*", index)).toArray(value -> new String[groupRequest.getIndices().size()]); + org.elasticsearch.action.search.SearchResponse elasticsearchResponse; + try { + elasticsearchResponse = client.search(new org.elasticsearch.action.search.SearchRequest(wildcardIndices) + .source(searchSourceBuilder)).actionGet(); + } catch (SearchPhaseExecutionException e) { + throw new InvalidSearchException("Could not execute search", e); + } + Map commonColumnMetadata; + try { + commonColumnMetadata = getCommonColumnMetadata(groupRequest.getIndices()); + } catch (IOException e) { + throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s", Arrays.toString(groupRequest.getIndices().toArray()))); + } + GroupResponse groupResponse = new GroupResponse(); + groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField()); + groupResponse.setGroupResults(getGroupResults(groupRequest, 0, elasticsearchResponse.getAggregations(), commonColumnMetadata)); + return groupResponse; + } + @Override public synchronized void init(AccessConfig config) { if(this.client == null) { @@ -330,9 +337,17 @@ protected String[] getLatestIndices(List includeIndices) { return latestIndices.values().toArray(new String[latestIndices.size()]); } - public void addFacetFields(SearchSourceBuilder searchSourceBuilder, List fields) { - for(String field: fields) { - searchSourceBuilder = searchSourceBuilder.aggregation(new TermsBuilder(getAggregationName(field)).field(field)); + private org.elasticsearch.search.sort.SortOrder getElasticsearchSortOrder( + org.apache.metron.indexing.dao.search.SortOrder sortOrder) { + return sortOrder == org.apache.metron.indexing.dao.search.SortOrder.DESC ? + org.elasticsearch.search.sort.SortOrder.DESC : org.elasticsearch.search.sort.SortOrder.ASC; + } + + private Order getElasticsearchGroupOrder(GroupOrder groupOrder) { + if (groupOrder.getGroupOrderType() == GroupOrderType.TERM) { + return groupOrder.getSortOrder() == SortOrder.ASC ? Order.term(true) : Order.term(false); + } else { + return groupOrder.getSortOrder() == SortOrder.ASC ? Order.count(true) : Order.count(false); } } @@ -340,33 +355,94 @@ public Map> getFacetCounts(List fields, Aggreg Map> fieldCounts = new HashMap<>(); for (String field: fields) { Map valueCounts = new HashMap<>(); - Aggregation aggregation = aggregations.get(getAggregationName(field)); - if (aggregation instanceof LongTerms) { - LongTerms longTerms = (LongTerms) aggregation; - FieldType type = commonColumnMetadata.get(field); - if (FieldType.IP.equals(type)) { - longTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(IpFieldMapper.longToIp((Long) bucket.getKey()), bucket.getDocCount())); - } else if (FieldType.BOOLEAN.equals(type)) { - longTerms.getBuckets().stream().forEach(bucket -> { - String key = (Long) bucket.getKey() == 1 ? "true" : "false"; - valueCounts.put(key, bucket.getDocCount()); - }); - } else { - longTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount())); - } - } else if (aggregation instanceof DoubleTerms) { - DoubleTerms doubleTerms = (DoubleTerms) aggregation; - doubleTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount())); - } else if (aggregation instanceof StringTerms) { - StringTerms stringTerms = (StringTerms) aggregation; - stringTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount())); + Aggregation aggregation = aggregations.get(getFacentAggregationName(field)); + if (aggregation instanceof Terms) { + Terms terms = (Terms) aggregation; + terms.getBuckets().stream().forEach(bucket -> valueCounts.put(formatKey(bucket.getKey(), commonColumnMetadata.get(field)), bucket.getDocCount())); } fieldCounts.put(field, valueCounts); } return fieldCounts; } - private String getAggregationName(String field) { + private String formatKey(Object key, FieldType type) { + if (FieldType.IP.equals(type)) { + return IpFieldMapper.longToIp((Long) key); + } else if (FieldType.BOOLEAN.equals(type)) { + return (Long) key == 1 ? "true" : "false"; + } else { + return key.toString(); + } + } + + private TermsBuilder getGroupsTermBuilder(GroupRequest groupRequest, int index) { + List groups = groupRequest.getGroups(); + Group group = groups.get(index); + String aggregationName = getGroupByAggregationName(group.getField()); + TermsBuilder termsBuilder = new TermsBuilder(aggregationName) + .field(group.getField()) + .size(accessConfig.getMaxSearchGroups()) + .order(getElasticsearchGroupOrder(group.getOrder())); + if (index < groups.size() - 1) { + termsBuilder.subAggregation(getGroupsTermBuilder(groupRequest, index + 1)); + } + Optional scoreField = groupRequest.getScoreField(); + if (scoreField.isPresent()) { + termsBuilder.subAggregation(new SumBuilder(getSumAggregationName(scoreField.get())).field(scoreField.get()).missing(0)); + } + return termsBuilder; + } + + private List getGroupResults(GroupRequest groupRequest, int index, Aggregations aggregations, Map commonColumnMetadata) { + List groups = groupRequest.getGroups(); + String field = groups.get(index).getField(); + Terms terms = aggregations.get(getGroupByAggregationName(field)); + List searchResultGroups = new ArrayList<>(); + for(Bucket bucket: terms.getBuckets()) { + GroupResult groupResult = new GroupResult(); + groupResult.setKey(formatKey(bucket.getKey(), commonColumnMetadata.get(field))); + groupResult.setTotal(bucket.getDocCount()); + Optional scoreField = groupRequest.getScoreField(); + if (scoreField.isPresent()) { + Sum score = bucket.getAggregations().get(getSumAggregationName(scoreField.get())); + groupResult.setScore(score.getValue()); + } + if (index < groups.size() - 1) { + groupResult.setGroupedBy(groups.get(index + 1).getField()); + groupResult.setGroupResults(getGroupResults(groupRequest, index + 1, bucket.getAggregations(), commonColumnMetadata)); + } + searchResultGroups.add(groupResult); + } + return searchResultGroups; + } + + private SearchResult getSearchResult(SearchHit searchHit, boolean fieldsPresent) { + SearchResult searchResult = new SearchResult(); + searchResult.setId(searchHit.getId()); + Map source; + if (fieldsPresent) { + source = new HashMap<>(); + searchHit.getFields().forEach((key, value) -> { + source.put(key, value.getValues().size() == 1 ? value.getValue() : value.getValues()); + }); + } else { + source = searchHit.getSource(); + } + searchResult.setSource(source); + searchResult.setScore(searchHit.getScore()); + searchResult.setIndex(searchHit.getIndex()); + return searchResult; + } + + private String getFacentAggregationName(String field) { return String.format("%s_count", field); } + + private String getGroupByAggregationName(String field) { + return String.format("%s_group", field); + } + + private String getSumAggregationName(String field) { + return String.format("%s_score", field); + } } diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java index d794ac9d65..5de9fd2a1f 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java @@ -50,7 +50,7 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { * "long_field": { "type": "long" }, * "timestamp" : { "type": "date" }, * "latitude" : { "type": "float" }, - * "double_field": { "type": "double" }, + * "score": { "type": "double" }, * "is_alert": { "type": "boolean" }, * "location_point": { "type": "geo_point" }, * "bro_field": { "type": "string" }, @@ -72,7 +72,7 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { * "long_field": { "type": "long" }, * "timestamp" : { "type": "date" }, * "latitude" : { "type": "float" }, - * "double_field": { "type": "double" }, + * "score": { "type": "double" }, * "is_alert": { "type": "boolean" }, * "location_point": { "type": "geo_point" }, * "snort_field": { "type": "integer" }, @@ -91,6 +91,7 @@ protected IndexDao createDao() throws Exception { ret.init( new AccessConfig() {{ setMaxSearchResults(100); + setMaxSearchGroups(100); setGlobalConfigSupplier( () -> new HashMap() {{ put("es.clustername", "metron"); diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java index ddb88e5895..4f47a65228 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java @@ -17,7 +17,6 @@ */ package org.apache.metron.indexing.dao; -import org.apache.metron.hbase.HTableProvider; import org.apache.metron.hbase.TableProvider; import java.util.HashMap; @@ -26,6 +25,7 @@ public class AccessConfig { private Integer maxSearchResults; + private Integer maxSearchGroups; private Supplier> globalConfigSupplier; private Map optionalSettings = new HashMap<>(); private TableProvider tableProvider = null; @@ -54,6 +54,18 @@ public void setMaxSearchResults(Integer maxSearchResults) { this.maxSearchResults = maxSearchResults; } + /** + * The maximum search groups. + * @return + */ + public Integer getMaxSearchGroups() { + return maxSearchGroups; + } + + public void setMaxSearchGroups(Integer maxSearchGroups) { + this.maxSearchGroups = maxSearchGroups; + } + /** * Get optional settings for initializing indices. * @return diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java index a1cf39884a..c890544754 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java @@ -27,6 +27,8 @@ import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; @@ -64,6 +66,11 @@ public synchronized SearchResponse search(SearchRequest searchRequest) throws In return null; } + @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + return null; + } + @Override public synchronized void init(AccessConfig config) { if(this.tableInterface == null) { diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java index 350e402ed5..745dccd9a8 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java @@ -22,6 +22,8 @@ import com.flipkart.zjsonpatch.JsonPatch; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; @@ -33,7 +35,6 @@ import java.io.IOException; import org.apache.metron.indexing.dao.search.FieldType; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Optional; @@ -49,6 +50,8 @@ public interface IndexDao { */ SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException; + GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException; + /** * Initialize the DAO with the AccessConfig object. * @param config diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java index e9a4a9a299..61c62318a1 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java @@ -22,6 +22,8 @@ import com.google.common.collect.Iterables; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; @@ -117,6 +119,17 @@ public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchEx return null; } + @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + for(IndexDao dao : indices) { + GroupResponse s = dao.group(groupRequest); + if(s != null) { + return s; + } + } + return null; + } + @Override public void init(AccessConfig config) { for(IndexDao dao : indices) { diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java new file mode 100644 index 0000000000..be020266af --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.metron.indexing.dao.search; + +public class Group { + + private GroupOrder order; + private String field; + + public Group() { + order = new GroupOrder(); + order.setGroupOrderType(GroupOrderType.TERM.toString()); + order.setSortOrder(SortOrder.DESC.toString()); + } + + public GroupOrder getOrder() { + return order; + } + + public void setOrder(GroupOrder order) { + this.order = order; + } + + public String getField() { + return field; + } + + public void setField(String field) { + this.field = field; + } +} diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java new file mode 100644 index 0000000000..b90c4380e4 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.metron.indexing.dao.search; + +public class GroupOrder { + + private SortOrder sortOrder; + private GroupOrderType groupOrderType; + + public SortOrder getSortOrder() { + return sortOrder; + } + + public void setSortOrder(String sortOrder) { + this.sortOrder = SortOrder.fromString(sortOrder); + } + + public GroupOrderType getGroupOrderType() { + return groupOrderType; + } + + public void setGroupOrderType(String groupOrderType) { + this.groupOrderType = GroupOrderType.fromString(groupOrderType); + } +} diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java new file mode 100644 index 0000000000..8444e5007f --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.metron.indexing.dao.search; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public enum GroupOrderType { + + @JsonProperty("count") + COUNT("count"), + @JsonProperty("term") + TERM("term"); + + private String groupOrderType; + + GroupOrderType(String groupOrderType) { + this.groupOrderType = groupOrderType; + } + + public String getGroupOrderType() { + return groupOrderType; + } + + public static GroupOrderType fromString(String groupOrderType) { + return GroupOrderType.valueOf(groupOrderType.toUpperCase()); + } +} diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java new file mode 100644 index 0000000000..121da1004c --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.metron.indexing.dao.search; + +import java.util.List; +import java.util.Optional; + +public class GroupRequest { + + private List indices; + private String query; + private String scoreField; + private List groups; + + public List getIndices() { + return indices; + } + + public void setIndices(List indices) { + this.indices = indices; + } + + public String getQuery() { + return query; + } + + public void setQuery(String query) { + this.query = query; + } + + public Optional getScoreField() { + return scoreField == null ? Optional.empty() : Optional.of(scoreField); + } + + public void setScoreField(String scoreField) { + this.scoreField = scoreField; + } + + public List getGroups() { + return groups; + } + + public void setGroups(List groups) { + this.groups = groups; + } +} diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java new file mode 100644 index 0000000000..1b426098a1 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.metron.indexing.dao.search; + +import java.util.List; + +public class GroupResponse { + + private String groupedBy; + private List groupResults; + + public String getGroupedBy() { + return groupedBy; + } + + public void setGroupedBy(String groupedBy) { + this.groupedBy = groupedBy; + } + + public List getGroupResults() { + return groupResults; + } + + public void setGroupResults(List groupResults) { + this.groupResults = groupResults; + } +} diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java new file mode 100644 index 0000000000..d40f1462e4 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.indexing.dao.search; + +import com.fasterxml.jackson.annotation.JsonInclude; +import java.util.List; + +public class GroupResult { + + private String key; + private long total; + private Double score; + private String groupedBy; + private List groupResults; + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public long getTotal() { + return total; + } + + public void setTotal(long total) { + this.total = total; + } + + public Double getScore() { + return score; + } + + public void setScore(Double score) { + this.score = score; + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + public String getGroupedBy() { + return groupedBy; + } + + public void setGroupedBy(String groupedBy) { + this.groupedBy = groupedBy; + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + public List getGroupResults() { + return groupResults; + } + + public void setGroupResults(List groups) { + this.groupResults = groups; + } +} diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java index 2d146e0a8e..6e48b584e1 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java @@ -22,7 +22,6 @@ import com.google.common.collect.ComparisonChain; import com.google.common.collect.Iterables; import org.apache.metron.common.Constants; -import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.indexing.dao.search.*; import org.apache.metron.indexing.dao.update.Document; @@ -77,6 +76,27 @@ public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchEx return ret; } + @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + GroupResponse groupResponse = new GroupResponse(); + groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField()); + groupResponse.setGroupResults(getGroupResults(groupRequest.getGroups(), 0)); + return groupResponse; + } + + private List getGroupResults(List groups, int index) { + Group group = groups.get(index); + GroupResult groupResult = new GroupResult(); + groupResult.setKey(group.getField() + "_value"); + if (index < groups.size() - 1) { + groupResult.setGroupedBy(groups.get(index + 1).getField()); + groupResult.setGroupResults(getGroupResults(groups, index + 1)); + } else { + groupResult.setScore(50.0); + } + groupResult.setTotal(10); + return Collections.singletonList(groupResult); + } private static class ComparableComparator implements Comparator { SortOrder order = null; diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java index 2645df2f22..0db8e37623 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java @@ -20,10 +20,13 @@ import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.search.SearchResult; +import org.apache.metron.indexing.dao.search.GroupResult; import org.apache.metron.integration.InMemoryComponent; import org.junit.After; import org.junit.Assert; @@ -40,11 +43,11 @@ public abstract class SearchIntegrationTest { /** * [ - * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, "long_field": 10000, "timestamp":1, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 1", "duplicate_name_field": "data 1"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "long_field": 20000, "timestamp":2, "latitude": 48.0001, "double_field": 1.00002, "is_alert":false, "location_point": "48.5839,7.7455", "bro_field": "bro data 2", "duplicate_name_field": "data 2"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", "duplicate_name_field": "data 3"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", "duplicate_name_field": "data 4"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", "duplicate_name_field": "data 5"} + * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, "long_field": 10000, "timestamp":1, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 1", "duplicate_name_field": "data 1"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "long_field": 20000, "timestamp":2, "latitude": 48.0001, "score": 50.0, "is_alert":false, "location_point": "48.5839,7.7455", "bro_field": "bro data 2", "duplicate_name_field": "data 2"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "score": 20.0, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", "duplicate_name_field": "data 3"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", "duplicate_name_field": "data 4"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "score": 98.0, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", "duplicate_name_field": "data 5"} * ] */ @Multiline @@ -52,11 +55,11 @@ public abstract class SearchIntegrationTest { /** * [ - * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5} + * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "score": 20.0, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "score": 50.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "score": 10.0, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5} * ] */ @Multiline @@ -149,7 +152,7 @@ public abstract class SearchIntegrationTest { /** * { - * "facetFields": ["source:type", "ip_src_addr", "ip_src_port", "long_field", "timestamp", "latitude", "double_field", "is_alert"], + * "facetFields": ["source:type", "ip_src_addr", "ip_src_port", "long_field", "timestamp", "latitude", "score", "is_alert"], * "indices": ["bro", "snort"], * "query": "*", * "from": 0, @@ -253,6 +256,63 @@ public abstract class SearchIntegrationTest { @Multiline public static String noResultsFieldsQuery; + /** + * { + * "groups": [ + * { + * "field":"is_alert" + * }, + * { + * "field":"latitude" + * } + * ], + * "scoreField":"score", + * "indices": ["bro", "snort"], + * "query": "*" + * } + */ + @Multiline + public static String groupByQuery; + + /** + * { + * "groups": [ + * { + * "field":"is_alert", + * "order": { + * "groupOrderType": "count", + * "sortOrder": "ASC" + * } + * }, + * { + * "field":"ip_src_addr", + * "order": { + * "groupOrderType": "term", + * "sortOrder": "DESC" + * } + * } + * ], + * "indices": ["bro", "snort"], + * "query": "*" + * } + */ + @Multiline + public static String sortedGroupByQuery; + + /** + * { + * "groups": [ + * { + * "field":"location_point" + * } + * ], + * "indices": ["bro", "snort"], + * "query": "*" + * } + */ + @Multiline + public static String badGroupQuery; + protected static IndexDao dao; protected static InMemoryComponent indexComponent; @@ -387,14 +447,18 @@ public void test() throws Exception { Assert.assertEquals(48.5839, Double.parseDouble(latitudeKeys.get(1)), 0.00001); Assert.assertEquals(new Long(2), latitudeCounts.get(latitudeKeys.get(0))); Assert.assertEquals(new Long(8), latitudeCounts.get(latitudeKeys.get(1))); - Map doubleFieldCounts = facetCounts.get("double_field"); - Assert.assertEquals(2, doubleFieldCounts.size()); - List doubleFieldKeys = new ArrayList<>(doubleFieldCounts.keySet()); - Collections.sort(doubleFieldKeys); - Assert.assertEquals(1.00001, Double.parseDouble(doubleFieldKeys.get(0)), 0.00001); - Assert.assertEquals(1.00002, Double.parseDouble(doubleFieldKeys.get(1)), 0.00001); - Assert.assertEquals(new Long(5), doubleFieldCounts.get(doubleFieldKeys.get(0))); - Assert.assertEquals(new Long(5), doubleFieldCounts.get(doubleFieldKeys.get(1))); + Map scoreFieldCounts = facetCounts.get("score"); + Assert.assertEquals(4, scoreFieldCounts.size()); + List scoreFieldKeys = new ArrayList<>(scoreFieldCounts.keySet()); + Collections.sort(scoreFieldKeys); + Assert.assertEquals(10.0, Double.parseDouble(scoreFieldKeys.get(0)), 0.00001); + Assert.assertEquals(20.0, Double.parseDouble(scoreFieldKeys.get(1)), 0.00001); + Assert.assertEquals(50.0, Double.parseDouble(scoreFieldKeys.get(2)), 0.00001); + Assert.assertEquals(98.0, Double.parseDouble(scoreFieldKeys.get(3)), 0.00001); + Assert.assertEquals(new Long(4), scoreFieldCounts.get(scoreFieldKeys.get(0))); + Assert.assertEquals(new Long(2), scoreFieldCounts.get(scoreFieldKeys.get(1))); + Assert.assertEquals(new Long(3), scoreFieldCounts.get(scoreFieldKeys.get(2))); + Assert.assertEquals(new Long(1), scoreFieldCounts.get(scoreFieldKeys.get(3))); Map isAlertCounts = facetCounts.get("is_alert"); Assert.assertEquals(2, isAlertCounts.size()); Assert.assertEquals(new Long(6), isAlertCounts.get("true")); @@ -440,7 +504,7 @@ public void test() throws Exception { Assert.assertEquals(FieldType.LONG, broTypes.get("long_field")); Assert.assertEquals(FieldType.DATE, broTypes.get("timestamp")); Assert.assertEquals(FieldType.FLOAT, broTypes.get("latitude")); - Assert.assertEquals(FieldType.DOUBLE, broTypes.get("double_field")); + Assert.assertEquals(FieldType.DOUBLE, broTypes.get("score")); Assert.assertEquals(FieldType.BOOLEAN, broTypes.get("is_alert")); Assert.assertEquals(FieldType.OTHER, broTypes.get("location_point")); Assert.assertEquals(FieldType.STRING, broTypes.get("bro_field")); @@ -453,7 +517,7 @@ public void test() throws Exception { Assert.assertEquals(FieldType.LONG, snortTypes.get("long_field")); Assert.assertEquals(FieldType.DATE, snortTypes.get("timestamp")); Assert.assertEquals(FieldType.FLOAT, snortTypes.get("latitude")); - Assert.assertEquals(FieldType.DOUBLE, snortTypes.get("double_field")); + Assert.assertEquals(FieldType.DOUBLE, snortTypes.get("score")); Assert.assertEquals(FieldType.BOOLEAN, snortTypes.get("is_alert")); Assert.assertEquals(FieldType.OTHER, snortTypes.get("location_point")); Assert.assertEquals(FieldType.INTEGER, snortTypes.get("snort_field")); @@ -486,7 +550,7 @@ public void test() throws Exception { Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp")); Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); - Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("double_field")); + Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score")); Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); } @@ -527,6 +591,145 @@ public void test() throws Exception { SearchResponse response = dao.search(request); Assert.assertEquals(0, response.getTotal()); } + // Group by test case, default order is count descending + { + GroupRequest request = JSONUtils.INSTANCE.load(groupByQuery, GroupRequest.class); + GroupResponse response = dao.group(request); + Assert.assertEquals("is_alert", response.getGroupedBy()); + List isAlertGroups = response.getGroupResults(); + Assert.assertEquals(2, isAlertGroups.size()); + + // isAlert == true group + GroupResult trueGroup = isAlertGroups.get(0); + Assert.assertEquals("true", trueGroup.getKey()); + Assert.assertEquals(6, trueGroup.getTotal()); + Assert.assertEquals("latitude", trueGroup.getGroupedBy()); + Assert.assertEquals(198.0, trueGroup.getScore(), 0.00001); + List trueLatitudeGroups = trueGroup.getGroupResults(); + Assert.assertEquals(2, trueLatitudeGroups.size()); + + + // isAlert == true && latitude == 48.5839 group + GroupResult trueLatitudeGroup2 = trueLatitudeGroups.get(0); + Assert.assertEquals(48.5839, Double.parseDouble(trueLatitudeGroup2.getKey()), 0.00001); + Assert.assertEquals(5, trueLatitudeGroup2.getTotal()); + Assert.assertEquals(148.0, trueLatitudeGroup2.getScore(), 0.00001); + + // isAlert == true && latitude == 48.0001 group + GroupResult trueLatitudeGroup1 = trueLatitudeGroups.get(1); + Assert.assertEquals(48.0001, Double.parseDouble(trueLatitudeGroup1.getKey()), 0.00001); + Assert.assertEquals(1, trueLatitudeGroup1.getTotal()); + Assert.assertEquals(50.0, trueLatitudeGroup1.getScore(), 0.00001); + + // isAlert == false group + GroupResult falseGroup = isAlertGroups.get(1); + Assert.assertEquals("false", falseGroup.getKey()); + Assert.assertEquals("latitude", falseGroup.getGroupedBy()); + Assert.assertEquals(130.0, falseGroup.getScore(), 0.00001); + List falseLatitudeGroups = falseGroup.getGroupResults(); + Assert.assertEquals(2, falseLatitudeGroups.size()); + + // isAlert == false && latitude == 48.5839 group + GroupResult falseLatitudeGroup2 = falseLatitudeGroups.get(0); + Assert.assertEquals(48.5839, Double.parseDouble(falseLatitudeGroup2.getKey()), 0.00001); + Assert.assertEquals(3, falseLatitudeGroup2.getTotal()); + Assert.assertEquals(80.0, falseLatitudeGroup2.getScore(), 0.00001); + + // isAlert == false && latitude == 48.0001 group + GroupResult falseLatitudeGroup1 = falseLatitudeGroups.get(1); + Assert.assertEquals(48.0001, Double.parseDouble(falseLatitudeGroup1.getKey()), 0.00001); + Assert.assertEquals(1, falseLatitudeGroup1.getTotal()); + Assert.assertEquals(50.0, falseLatitudeGroup1.getScore(), 0.00001); + } + // Group by with sorting test case where is_alert is sorted by count ascending and ip_src_addr is sorted by term descending + { + GroupRequest request = JSONUtils.INSTANCE.load(sortedGroupByQuery, GroupRequest.class); + GroupResponse response = dao.group(request); + Assert.assertEquals("is_alert", response.getGroupedBy()); + List isAlertGroups = response.getGroupResults(); + Assert.assertEquals(2, isAlertGroups.size()); + + // isAlert == false group + GroupResult falseGroup = isAlertGroups.get(0); + Assert.assertEquals(4, falseGroup.getTotal()); + Assert.assertEquals("ip_src_addr", falseGroup.getGroupedBy()); + List falseIpSrcAddrGroups = falseGroup.getGroupResults(); + Assert.assertEquals(4, falseIpSrcAddrGroups.size()); + + // isAlert == false && ip_src_addr == 192.168.1.8 group + GroupResult falseIpSrcAddrGroup1 = falseIpSrcAddrGroups.get(0); + Assert.assertEquals("192.168.1.8", falseIpSrcAddrGroup1.getKey()); + Assert.assertEquals(1, falseIpSrcAddrGroup1.getTotal()); + Assert.assertNull(falseIpSrcAddrGroup1.getGroupedBy()); + Assert.assertNull(falseIpSrcAddrGroup1.getGroupResults()); + + // isAlert == false && ip_src_addr == 192.168.1.7 group + GroupResult falseIpSrcAddrGroup2 = falseIpSrcAddrGroups.get(1); + Assert.assertEquals("192.168.1.7", falseIpSrcAddrGroup2.getKey()); + Assert.assertEquals(1, falseIpSrcAddrGroup2.getTotal()); + Assert.assertNull(falseIpSrcAddrGroup2.getGroupedBy()); + Assert.assertNull(falseIpSrcAddrGroup2.getGroupResults()); + + // isAlert == false && ip_src_addr == 192.168.1.6 group + GroupResult falseIpSrcAddrGroup3 = falseIpSrcAddrGroups.get(2); + Assert.assertEquals("192.168.1.6", falseIpSrcAddrGroup3.getKey()); + Assert.assertEquals(1, falseIpSrcAddrGroup3.getTotal()); + Assert.assertNull(falseIpSrcAddrGroup3.getGroupedBy()); + Assert.assertNull(falseIpSrcAddrGroup3.getGroupResults()); + + // isAlert == false && ip_src_addr == 192.168.1.2 group + GroupResult falseIpSrcAddrGroup4 = falseIpSrcAddrGroups.get(3); + Assert.assertEquals("192.168.1.2", falseIpSrcAddrGroup4.getKey()); + Assert.assertEquals(1, falseIpSrcAddrGroup4.getTotal()); + Assert.assertNull(falseIpSrcAddrGroup4.getGroupedBy()); + Assert.assertNull(falseIpSrcAddrGroup4.getGroupResults()); + + // isAlert == false group + GroupResult trueGroup = isAlertGroups.get(1); + Assert.assertEquals(6, trueGroup.getTotal()); + Assert.assertEquals("ip_src_addr", trueGroup.getGroupedBy()); + List trueIpSrcAddrGroups = trueGroup.getGroupResults(); + Assert.assertEquals(4, trueIpSrcAddrGroups.size()); + + // isAlert == false && ip_src_addr == 192.168.1.5 group + GroupResult trueIpSrcAddrGroup1 = trueIpSrcAddrGroups.get(0); + Assert.assertEquals("192.168.1.5", trueIpSrcAddrGroup1.getKey()); + Assert.assertEquals(1, trueIpSrcAddrGroup1.getTotal()); + Assert.assertNull(trueIpSrcAddrGroup1.getGroupedBy()); + Assert.assertNull(trueIpSrcAddrGroup1.getGroupResults()); + + // isAlert == false && ip_src_addr == 192.168.1.4 group + GroupResult trueIpSrcAddrGroup2 = trueIpSrcAddrGroups.get(1); + Assert.assertEquals("192.168.1.4", trueIpSrcAddrGroup2.getKey()); + Assert.assertEquals(1, trueIpSrcAddrGroup2.getTotal()); + Assert.assertNull(trueIpSrcAddrGroup2.getGroupedBy()); + Assert.assertNull(trueIpSrcAddrGroup2.getGroupResults()); + + // isAlert == false && ip_src_addr == 192.168.1.3 group + GroupResult trueIpSrcAddrGroup3 = trueIpSrcAddrGroups.get(2); + Assert.assertEquals("192.168.1.3", trueIpSrcAddrGroup3.getKey()); + Assert.assertEquals(1, trueIpSrcAddrGroup3.getTotal()); + Assert.assertNull(trueIpSrcAddrGroup3.getGroupedBy()); + Assert.assertNull(trueIpSrcAddrGroup3.getGroupResults()); + + // isAlert == false && ip_src_addr == 192.168.1.1 group + GroupResult trueIpSrcAddrGroup4 = trueIpSrcAddrGroups.get(3); + Assert.assertEquals("192.168.1.1", trueIpSrcAddrGroup4.getKey()); + Assert.assertEquals(3, trueIpSrcAddrGroup4.getTotal()); + Assert.assertNull(trueIpSrcAddrGroup4.getGroupedBy()); + Assert.assertNull(trueIpSrcAddrGroup4.getGroupResults()); + } + //Bad group query + { + GroupRequest request = JSONUtils.INSTANCE.load(badGroupQuery, GroupRequest.class); + try { + dao.group(request); + Assert.fail("Exception expected, but did not come."); + } + catch(InvalidSearchException ise) { + Assert.assertEquals("Could not execute search", ise.getMessage()); + } + } } @AfterClass