Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions django_mongodb_backend/aggregates.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Variance,
)
from django.db.models.expressions import Case, Value, When
from django.db.models.functions.comparison import Coalesce
from django.db.models.lookups import IsNull
from django.db.models.sql.where import WhereNode

Expand Down Expand Up @@ -69,6 +70,9 @@ def count(self, compiler, connection, resolve_inner_expression=False):
if resolve_inner_expression:
return lhs_mql
return {"$sum": lhs_mql}
# Normalize empty documents (introduced by aggregation wrapping) to an
# empty set fallback.
agg_expression = Coalesce(agg_expression, [])
# If distinct=True or resolve_inner_expression=False, sum the size of the
# set.
return {"$size": agg_expression.as_mql(compiler, connection, as_expr=True)}
Expand Down
23 changes: 8 additions & 15 deletions django_mongodb_backend/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def __init__(self, *args, **kwargs):
self.subqueries = []
# Atlas search stage.
self.search_pipeline = []
# Does the aggregation have no GROUP BY fields and need wrapping?
self.needs_wrap_aggregation = False
# The MQL equivalent to a SQL HAVING clause.
self.having_match_mql = None

def _get_group_alias_column(self, expr, annotation_group_idx):
"""Generate a dummy field for use in the ids fields in $group."""
Expand Down Expand Up @@ -234,21 +238,10 @@ def _build_aggregation_pipeline(self, ids, group):
"""Build the aggregation pipeline for grouping."""
pipeline = []
if not ids:
group["_id"] = None
pipeline.append({"$facet": {"group": [{"$group": group}]}})
pipeline.append(
{
"$addFields": {
key: {
"$getField": {
"input": {"$arrayElemAt": ["$group", 0]},
"field": key,
}
}
for key in group
}
}
)
pipeline.append({"$group": {"_id": None, **group}})
# The aggregation must be wrapped if there are no group by ids and
# no having clause.
self.needs_wrap_aggregation = not bool(self.having)
else:
group["_id"] = ids
pipeline.append({"$group": group})
Expand Down
42 changes: 14 additions & 28 deletions django_mongodb_backend/fields/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,37 +310,23 @@ class ArrayOverlap(ArrayRHSMixin, FieldGetDbPrepValueMixin, Lookup):

def get_subquery_wrapping_pipeline(self, compiler, connection, field_name, expr):
return [
{"$project": {"subquery_results": expr.as_mql(compiler, connection, as_expr=True)}},
{"$unwind": "$subquery_results"},
{
"$facet": {
"group": [
{"$project": {"tmp_name": expr.as_mql(compiler, connection, as_expr=True)}},
{
"$unwind": "$tmp_name",
},
{
"$group": {
"_id": None,
"tmp_name": {"$addToSet": "$tmp_name"},
}
},
]
}
},
{
"$project": {
field_name: {
"$ifNull": [
{
"$getField": {
"input": {"$arrayElemAt": ["$group", 0]},
"field": "tmp_name",
}
},
[],
]
}
"$group": {
"_id": None,
"subquery_results": {"$addToSet": "$subquery_results"},
}
},
# Workaround for https://jira.mongodb.org/browse/SERVER-114196:
# $$NOW becomes unavailable after $unionWith, so it must be stored
# beforehand to ensure it remains accessible later in the pipeline.
{"$addFields": {"__now": "$$NOW"}},
# Add an extra empty document to handle default values on empty
# results.
{"$unionWith": {"pipeline": [{"$documents": [{"subquery_results": []}]}]}},
{"$limit": 1},
{"$project": {field_name: "$subquery_results"}},
]

def as_mql_expr(self, compiler, connection):
Expand Down
54 changes: 19 additions & 35 deletions django_mongodb_backend/fields/embedded_model_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,44 +150,28 @@ def get_subquery_wrapping_pipeline(self, compiler, connection, field_name, expr)
# structure of EmbeddedModelArrayField on the RHS behaves similar to
# ArrayField.
return [
{"$project": {"subquery_results": expr.as_mql(compiler, connection, as_expr=True)}},
# Use an $unwind followed by a $group to concatenate all the values
# from the RHS subquery.
{"$unwind": "$subquery_results"},
# The $group stage collects values into an array using $addToSet.
# The use of {_id: null} results in a single grouped array, but
# because arrays from multiple documents are aggregated, the result
# is a list of lists.
{
"$facet": {
"gathered_data": [
{"$project": {"tmp_name": expr.as_mql(compiler, connection, as_expr=True)}},
# To concatenate all the values from the RHS subquery,
# use an $unwind followed by a $group.
{
"$unwind": "$tmp_name",
},
# The $group stage collects values into an array using
# $addToSet. The use of {_id: null} results in a
# single grouped array. However, because arrays from
# multiple documents are aggregated, the result is a
# list of lists.
{
"$group": {
"_id": None,
"tmp_name": {"$addToSet": "$tmp_name"},
}
},
]
}
},
{
"$project": {
field_name: {
"$ifNull": [
{
"$getField": {
"input": {"$arrayElemAt": ["$gathered_data", 0]},
"field": "tmp_name",
}
},
[],
]
}
"$group": {
"_id": None,
"subquery_results": {"$addToSet": "$subquery_results"},
}
},
# Workaround for https://jira.mongodb.org/browse/SERVER-114196:
# $$NOW becomes unavailable after $unionWith, so it must be stored
# beforehand to ensure it remains accessible later in the pipeline.
{"$addFields": {"__now": "$$NOW"}},
# Add a dummy document in case of an empty result.
{"$unionWith": {"pipeline": [{"$documents": [{"subquery_results": []}]}]}},
{"$limit": 1},
{"$project": {field_name: "$subquery_results"}},
]


Expand Down
39 changes: 13 additions & 26 deletions django_mongodb_backend/lookups.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,34 +56,21 @@ def inner(self, compiler, connection):
def get_subquery_wrapping_pipeline(self, compiler, connection, field_name, expr): # noqa: ARG001
return [
{
"$facet": {
"group": [
{
"$group": {
"_id": None,
"tmp_name": {
"$addToSet": expr.as_mql(compiler, connection, as_expr=True)
},
}
}
]
}
},
{
"$project": {
field_name: {
"$ifNull": [
{
"$getField": {
"input": {"$arrayElemAt": ["$group", 0]},
"field": "tmp_name",
}
},
[],
]
}
"$group": {
"_id": None,
# Use a temporary name to support field_name="_id".
"subquery_results": {"$addToSet": expr.as_mql(compiler, connection, as_expr=True)},
}
},
# Workaround for https://jira.mongodb.org/browse/SERVER-114196:
# $$NOW becomes unavailable after $unionWith, so it must be stored
# beforehand to ensure it remains accessible later in the pipeline.
{"$addFields": {"__now": "$$NOW"}},
# Add an extra empty document to handle default values on empty
# results.
{"$unionWith": {"pipeline": [{"$documents": [{"subquery_results": []}]}]}},
{"$limit": 1},
{"$project": {field_name: "$subquery_results"}},
]


Expand Down
20 changes: 20 additions & 0 deletions django_mongodb_backend/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(self, compiler):
# $lookup stage that encapsulates the pipeline for performing a nested
# subquery.
self.subquery_lookup = None
self.needs_wrap_aggregation = compiler.needs_wrap_aggregation

def __repr__(self):
return f"<MongoQuery: {self.match_mql!r} ORDER {self.ordering!r}>"
Expand Down Expand Up @@ -91,6 +92,25 @@ def get_pipeline(self):
pipeline.append({"$match": self.match_mql})
if self.aggregation_pipeline:
pipeline.extend(self.aggregation_pipeline)
if self.needs_wrap_aggregation:
# Add the aggregation stage for queries without a GROUP BY.
# e.g. SQL equivalent of SELECT avg(col) FROM table
pipeline.extend(
[
# Workaround for https://jira.mongodb.org/browse/SERVER-114196:
# $$NOW becomes unavailable after $unionWith, so it must be
# stored beforehand to ensure it remains accessible later
# in the pipeline.
{"$addFields": {"__now": "$$NOW"}},
# Add an empty extra document to handle default values on
# empty results.
{"$unionWith": {"pipeline": [{"$documents": [{}]}]}},
# Limiting to one document ensures the original result
# takes precedence when present, otherwise the injected
# empty document is used.
{"$limit": 1},
]
)
if self.project_fields:
pipeline.append({"$project": self.project_fields})
if self.combinator_pipeline:
Expand Down
5 changes: 5 additions & 0 deletions docs/releases/6.0.x.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ Bug fixes

- ...

Performance improvements
------------------------

- Removed usage of ``$facet`` from aggregate queries.

6.0.0
=====

Expand Down
25 changes: 6 additions & 19 deletions tests/lookup_/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,28 +137,15 @@ def test_subquery_filter_constant(self):
"let": {},
"pipeline": [
{"$match": {"num": {"$gt": 2}}},
{"$group": {"_id": None, "subquery_results": {"$addToSet": "$num"}}},
{"$addFields": {"__now": "$$NOW"}},
{
"$facet": {
"group": [
{"$group": {"_id": None, "tmp_name": {"$addToSet": "$num"}}}
]
}
},
{
"$project": {
"num": {
"$ifNull": [
{
"$getField": {
"input": {"$arrayElemAt": ["$group", 0]},
"field": "tmp_name",
}
},
[],
]
}
"$unionWith": {
"pipeline": [{"$documents": [{"subquery_results": []}]}]
}
},
{"$limit": 1},
{"$project": {"num": "$subquery_results"}},
],
}
},
Expand Down
15 changes: 15 additions & 0 deletions tests/model_fields_/test_arrayfield.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,21 @@ def test_overlap_values(self):
self.objs[:3],
)

def test_overlap_empty_values(self):
qs = NullableIntegerArrayModel.objects.filter(order__lt=-30)
self.assertCountEqual(
NullableIntegerArrayModel.objects.filter(
field__overlap=qs.values_list("field"),
),
[],
)
self.assertCountEqual(
NullableIntegerArrayModel.objects.filter(
field__overlap=qs.values("field"),
),
[],
)

def test_index(self):
self.assertSequenceEqual(
NullableIntegerArrayModel.objects.filter(field__0=2), self.objs[1:3]
Expand Down
5 changes: 5 additions & 0 deletions tests/model_fields_/test_embedded_model_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,11 @@ def test_subquery_in_lookup(self):
result = Exhibit.objects.filter(sections__number__in=subquery)
self.assertCountEqual(result, [self.wonders, self.new_discoveries, self.egypt])

def test_subquery_empty_in_lookup(self):
subquery = Audit.objects.filter(section_number=10).values_list("section_number", flat=True)
result = Exhibit.objects.filter(sections__number__in=subquery)
self.assertCountEqual(result, [])

def test_array_as_rhs(self):
result = Exhibit.objects.filter(main_section__number__in=models.F("sections__number"))
self.assertCountEqual(result, [self.new_discoveries])
Expand Down