Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,9 @@ public enum BuiltinFunctionName {
.put("stddev_samp", BuiltinFunctionName.STDDEV_SAMP)
// .put("earliest", BuiltinFunctionName.EARLIEST)
// .put("latest", BuiltinFunctionName.LATEST)
.put("distinct_count_approx", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.put("dc", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.put("distinct_count", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.put("pattern", BuiltinFunctionName.INTERNAL_PATTERN)
.build();

Expand Down
24 changes: 24 additions & 0 deletions docs/user/ppl/cmd/eventstats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,28 @@ Example::
+----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+


DISTINCT_COUNT, DC(Since 3.3)
------------------

Description
>>>>>>>>>>>

Usage: DISTINCT_COUNT(expr), DC(expr). Returns the approximate number of distinct values of expr using HyperLogLog++ algorithm. Both ``DISTINCT_COUNT`` and ``DC`` are equivalent and provide the same functionality.

Example::

PPL> source=accounts | eventstats dc(state) as distinct_states, distinct_count(state) as dc_states_alt by gender;
fetched rows / total rows = 4/4
+----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------+-----------------+
| account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | distinct_states | dc_states_alt |
|----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------|-----------------|
| 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 1 | 1 |
| 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 3 | 3 |
| 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 3 | 3 |
| 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 3 | 3 |
+----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------+-----------------+


Configuration
=============
This command requires Calcite enabled.
Expand Down Expand Up @@ -312,6 +334,8 @@ Eventstats::
source = table | where a < 50 | eventstats count(c)
source = table | eventstats min(c), max(c) by b
source = table | eventstats count(c) as count_by by b | where count_by > 1000
source = table | eventstats dc(field) as distinct_count
source = table | eventstats distinct_count(category) by region


Example 1: Calculate the average, sum and count of a field by group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,27 @@ public void supportPushDownScriptOnTextField() throws IOException {
assertJsonEqualsIgnoreId(expected, result);
}

@Test
public void testEventstatsDistinctCountExplain() throws IOException {
Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled());
String query =
"source=opensearch-sql_test_index_account | eventstats dc(state) as distinct_states";
var result = explainQueryToString(query);
String expected = loadFromFile("expectedOutput/calcite/explain_eventstats_dc.json");
assertJsonEqualsIgnoreId(expected, result);
}

@Test
public void testEventstatsDistinctCountFunctionExplain() throws IOException {
Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled());
String query =
"source=opensearch-sql_test_index_account | eventstats distinct_count(state) as"
+ " distinct_states by gender";
var result = explainQueryToString(query);
String expected = loadFromFile("expectedOutput/calcite/explain_eventstats_distinct_count.json");
assertJsonEqualsIgnoreId(expected, result);
}

// Only for Calcite, as v2 gets unstable serialized string for function
@Test
public void testExplainOnAggregationWithSumEnhancement() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import org.json.JSONObject;
import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import org.opensearch.client.Request;
import org.opensearch.sql.legacy.TestsConstants;
import org.opensearch.sql.ppl.PPLIntegTestCase;

public class CalcitePPLEventstatsIT extends PPLIntegTestCase {
Expand Down Expand Up @@ -290,28 +288,6 @@ public void testUnsupportedWindowFunctions() {
}
}

@Ignore("DC should fail in window function")
public void testDistinctCountShouldFail() throws IOException {
Request request1 =
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true");
request1.setJsonEntity(
"{\"name\":\"Jim\",\"age\":27,\"state\":\"Ontario\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
client().performRequest(request1);
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats distinct_count(state) by country",
TEST_INDEX_STATE_COUNTRY));

verifyDataRows(
actual,
rows("John", "Canada", "Ontario", 4, 2023, 25, 3),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 3),
rows("Jim", "Canada", "Ontario", 4, 2023, 27, 3),
rows("Jake", "USA", "California", 4, 2023, 70, 2),
rows("Hello", "USA", "New York", 4, 2023, 30, 2));
}

@Test
public void testMultipleEventstat() throws IOException {
JSONObject actual =
Expand Down Expand Up @@ -599,6 +575,111 @@ public void testEventstatVarianceWithNullBy() throws IOException {
rows("Hello", "USA", "New York", 4, 2023, 30, 20, 28.284271247461902, 400, 800));
}

@Test
public void testEventstatDistinctCount() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats dc(state) as dc_state", TEST_INDEX_STATE_COUNTRY));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("dc_state", "bigint"));

verifyDataRows(
actual,
rows("John", "Canada", "Ontario", 4, 2023, 25, 4),
rows("Jake", "USA", "California", 4, 2023, 70, 4),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 4),
rows("Hello", "USA", "New York", 4, 2023, 30, 4));
}

@Test
public void testEventstatDistinctCountByCountry() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats dc(state) as dc_state by country",
TEST_INDEX_STATE_COUNTRY));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("dc_state", "bigint"));

verifyDataRows(
actual,
rows("John", "Canada", "Ontario", 4, 2023, 25, 2),
rows("Jake", "USA", "California", 4, 2023, 70, 2),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2),
rows("Hello", "USA", "New York", 4, 2023, 30, 2));
}

@Test
public void testEventstatDistinctCountFunction() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats distinct_count(country) as dc_country",
TEST_INDEX_STATE_COUNTRY));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("dc_country", "bigint"));

verifyDataRows(
actual,
rows("John", "Canada", "Ontario", 4, 2023, 25, 2),
rows("Jake", "USA", "California", 4, 2023, 70, 2),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2),
rows("Hello", "USA", "New York", 4, 2023, 30, 2));
}

@Test
public void testEventstatDistinctCountWithNull() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eventstats dc(state) as dc_state",
TEST_INDEX_STATE_COUNTRY_WITH_NULL));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("dc_state", "bigint"));

verifyDataRows(
actual,
rows(null, "Canada", null, 4, 2023, 10, 4),
rows("Kevin", null, null, 4, 2023, null, 4),
rows("John", "Canada", "Ontario", 4, 2023, 25, 4),
rows("Jake", "USA", "California", 4, 2023, 70, 4),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 4),
rows("Hello", "USA", "New York", 4, 2023, 30, 4));
}

@Ignore
@Test
public void testEventstatEarliestAndLatest() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[APPROX_DISTINCT_COUNT($7) OVER ()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(aggs [APPROX_DISTINCT_COUNT($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[APPROX_DISTINCT_COUNT($7) OVER (PARTITION BY $4)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(partition {4} aggs [APPROX_DISTINCT_COUNT($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not pushed down. Reusing existing implementation may solve this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuancu You're right that it's not being pushed down. However, removing the registration in PPLFuncImpTable won't solve the push-down issue. The CalciteExplainIT goes through the external OpenSearch execution path and would use the existing implementation in OpenSearchExecutionEngine.java

Copy link
Collaborator

@yuancu yuancu Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stats dc(field) can be pushed down -- source=opensearch-sql_test_index_account | stats dc(age) as unique_age is explained as the following plan:

{
  "calcite": {
    "logical": """LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
  LogicalAggregate(group=[{}], unique_age=[COUNT(DISTINCT $0)])
    LogicalProject(age=[$8])
      CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
""",
    "physical": """EnumerableLimit(fetch=[10000])
  CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#22:LogicalAggregate.NONE.[](input=RelSubset#21,group={},unique_age=COUNT(DISTINCT $0))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"unique_age":{"cardinality":{"field":"age"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
"""
  }
}

You can imitate how stats dc(field) is pushed down to enable the same for eventstats dc(field).

Update: I found that all of the existing eventstats agg_function(field) are not pushed down. Maybe it's hard to push down window functions..

}
}
2 changes: 2 additions & 0 deletions ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ scalarWindowFunctionName
| LAST
| NTH
| NTILE
| DISTINCT_COUNT
| DC
;

// aggregation terms
Expand Down
Loading