diff --git a/firestore/google/cloud/firestore_v1/client.py b/firestore/google/cloud/firestore_v1/client.py index 8c7c3f660807..06ca37e6d819 100644 --- a/firestore/google/cloud/firestore_v1/client.py +++ b/firestore/google/cloud/firestore_v1/client.py @@ -26,6 +26,7 @@ from google.cloud.client import ClientWithProject from google.cloud.firestore_v1 import _helpers +from google.cloud.firestore_v1 import query from google.cloud.firestore_v1 import types from google.cloud.firestore_v1.batch import WriteBatch from google.cloud.firestore_v1.collection import CollectionReference @@ -179,6 +180,31 @@ def collection(self, *collection_path): return CollectionReference(*path, client=self) + def collection_group(self, collection_id): + """ + Creates and returns a new Query that includes all documents in the + database that are contained in a collection or subcollection with the + given collection_id. + + .. code-block:: python + + >>> query = firestore.collection_group('mygroup') + + @param {string} collectionId Identifies the collections to query over. + Every collection or subcollection with this ID as the last segment of its + path will be included. Cannot contain a slash. + @returns {Query} The created Query. + """ + if "/" in collection_id: + raise ValueError( + "Invalid collection_id " + + collection_id + + ". Collection IDs must not contain '/'." + ) + + collection = self.collection(collection_id) + return query.Query(collection, all_descendants=True) + def document(self, *document_path): """Get a reference to a document in a collection. @@ -215,6 +241,13 @@ def document(self, *document_path): else: path = document_path + # DocumentReference takes a relative path. Strip the database string if present. + base_path = self._database_string + "/documents/" + joined_path = _helpers.DOCUMENT_PATH_DELIMITER.join(path) + if joined_path.startswith(base_path): + joined_path = joined_path[len(base_path) :] + path = joined_path.split(_helpers.DOCUMENT_PATH_DELIMITER) + return DocumentReference(*path, client=self) @staticmethod diff --git a/firestore/google/cloud/firestore_v1/query.py b/firestore/google/cloud/firestore_v1/query.py index 6c6239989e8f..12141cc806b5 100644 --- a/firestore/google/cloud/firestore_v1/query.py +++ b/firestore/google/cloud/firestore_v1/query.py @@ -111,6 +111,10 @@ class Query(object): any matching documents will be included in the result set. When the query is formed, the document values will be used in the order given by ``orders``. + all_descendants (Optional[bool]): When false, selects only collections + that are immediate children of the `parent` specified in the + containing `RunQueryRequest`. When true, selects all descendant + collections. """ ASCENDING = "ASCENDING" @@ -128,6 +132,7 @@ def __init__( offset=None, start_at=None, end_at=None, + all_descendants=False, ): self._parent = parent self._projection = projection @@ -137,6 +142,7 @@ def __init__( self._offset = offset self._start_at = start_at self._end_at = end_at + self._all_descendants = all_descendants def __eq__(self, other): if not isinstance(other, self.__class__): @@ -150,6 +156,7 @@ def __eq__(self, other): and self._offset == other._offset and self._start_at == other._start_at and self._end_at == other._end_at + and self._all_descendants == other._all_descendants ) @property @@ -203,6 +210,7 @@ def select(self, field_paths): offset=self._offset, start_at=self._start_at, end_at=self._end_at, + all_descendants=self._all_descendants, ) def where(self, field_path, op_string, value): @@ -270,6 +278,7 @@ def where(self, field_path, op_string, value): offset=self._offset, start_at=self._start_at, end_at=self._end_at, + all_descendants=self._all_descendants, ) @staticmethod @@ -321,6 +330,7 @@ def order_by(self, field_path, direction=ASCENDING): offset=self._offset, start_at=self._start_at, end_at=self._end_at, + all_descendants=self._all_descendants, ) def limit(self, count): @@ -346,6 +356,7 @@ def limit(self, count): offset=self._offset, start_at=self._start_at, end_at=self._end_at, + all_descendants=self._all_descendants, ) def offset(self, num_to_skip): @@ -372,6 +383,7 @@ def offset(self, num_to_skip): offset=num_to_skip, start_at=self._start_at, end_at=self._end_at, + all_descendants=self._all_descendants, ) def _cursor_helper(self, document_fields, before, start): @@ -418,6 +430,7 @@ def _cursor_helper(self, document_fields, before, start): "orders": self._orders, "limit": self._limit, "offset": self._offset, + "all_descendants": self._all_descendants, } if start: query_kwargs["start_at"] = cursor_pair @@ -679,7 +692,7 @@ def _to_protobuf(self): "select": projection, "from": [ query_pb2.StructuredQuery.CollectionSelector( - collection_id=self._parent.id + collection_id=self._parent.id, all_descendants=self._all_descendants ) ], "where": self._filters_pb(), @@ -739,9 +752,14 @@ def stream(self, transaction=None): ) for response in response_iterator: - snapshot = _query_response_to_snapshot( - response, self._parent, expected_prefix - ) + if self._all_descendants: + snapshot = _collection_group_query_response_to_snapshot( + response, self._parent + ) + else: + snapshot = _query_response_to_snapshot( + response, self._parent, expected_prefix + ) if snapshot is not None: yield snapshot @@ -968,3 +986,32 @@ def _query_response_to_snapshot(response_pb, collection, expected_prefix): update_time=response_pb.document.update_time, ) return snapshot + + +def _collection_group_query_response_to_snapshot(response_pb, collection): + """Parse a query response protobuf to a document snapshot. + + Args: + response_pb (google.cloud.proto.firestore.v1.\ + firestore_pb2.RunQueryResponse): A + collection (~.firestore_v1.collection.CollectionReference): A + reference to the collection that initiated the query. + + Returns: + Optional[~.firestore.document.DocumentSnapshot]: A + snapshot of the data returned in the query. If ``response_pb.document`` + is not set, the snapshot will be :data:`None`. + """ + if not response_pb.HasField("document"): + return None + reference = collection._client.document(response_pb.document.name) + data = _helpers.decode_dict(response_pb.document.fields, collection._client) + snapshot = document.DocumentSnapshot( + reference, + data, + exists=True, + read_time=response_pb.read_time, + create_time=response_pb.document.create_time, + update_time=response_pb.document.update_time, + ) + return snapshot diff --git a/firestore/tests/system.py b/firestore/tests/system.py index 75ae3fb2d4c6..a8e683629add 100644 --- a/firestore/tests/system.py +++ b/firestore/tests/system.py @@ -634,6 +634,120 @@ def test_query_unary(client, cleanup): assert math.isnan(data1[field_name]) +def test_collection_group_queries(client, cleanup): + collection_group = "b" + unique_resource_id("-") + + doc_paths = [ + "abc/123/" + collection_group + "/cg-doc1", + "abc/123/" + collection_group + "/cg-doc2", + collection_group + "/cg-doc3", + collection_group + "/cg-doc4", + "def/456/" + collection_group + "/cg-doc5", + collection_group + "/virtual-doc/nested-coll/not-cg-doc", + "x" + collection_group + "/not-cg-doc", + collection_group + "x/not-cg-doc", + "abc/123/" + collection_group + "x/not-cg-doc", + "abc/123/x" + collection_group + "/not-cg-doc", + "abc/" + collection_group, + ] + + batch = client.batch() + for doc_path in doc_paths: + doc_ref = client.document(doc_path) + batch.set(doc_ref, {"x": 1}) + + batch.commit() + + query = client.collection_group(collection_group) + snapshots = list(query.stream()) + found = [snapshot.id for snapshot in snapshots] + expected = ["cg-doc1", "cg-doc2", "cg-doc3", "cg-doc4", "cg-doc5"] + assert found == expected + + +def test_collection_group_queries_startat_endat(client, cleanup): + collection_group = "b" + unique_resource_id("-") + + doc_paths = [ + "a/a/" + collection_group + "/cg-doc1", + "a/b/a/b/" + collection_group + "/cg-doc2", + "a/b/" + collection_group + "/cg-doc3", + "a/b/c/d/" + collection_group + "/cg-doc4", + "a/c/" + collection_group + "/cg-doc5", + collection_group + "/cg-doc6", + "a/b/nope/nope", + ] + + batch = client.batch() + for doc_path in doc_paths: + doc_ref = client.document(doc_path) + batch.set(doc_ref, {"x": doc_path}) + + batch.commit() + + query = ( + client.collection_group(collection_group) + .order_by("__name__") + .start_at([client.document("a/b")]) + .end_at([client.document("a/b0")]) + ) + snapshots = list(query.stream()) + found = set(snapshot.id for snapshot in snapshots) + assert found == set(["cg-doc2", "cg-doc3", "cg-doc4"]) + + query = ( + client.collection_group(collection_group) + .order_by("__name__") + .start_after([client.document("a/b")]) + .end_before([client.document("a/b/" + collection_group + "/cg-doc3")]) + ) + snapshots = list(query.stream()) + found = set(snapshot.id for snapshot in snapshots) + assert found == set(["cg-doc2"]) + + +def test_collection_group_queries_filters(client, cleanup): + collection_group = "b" + unique_resource_id("-") + + doc_paths = [ + "a/a/" + collection_group + "/cg-doc1", + "a/b/a/b/" + collection_group + "/cg-doc2", + "a/b/" + collection_group + "/cg-doc3", + "a/b/c/d/" + collection_group + "/cg-doc4", + "a/c/" + collection_group + "/cg-doc5", + collection_group + "/cg-doc6", + "a/b/nope/nope", + ] + + batch = client.batch() + + for index, doc_path in enumerate(doc_paths): + doc_ref = client.document(doc_path) + batch.set(doc_ref, {"x": index}) + + batch.commit() + + query = ( + client.collection_group(collection_group) + .where("__name__", ">=", client.document("a/b")) + .where("__name__", "<=", client.document("a/b0")) + ) + snapshots = list(query.stream()) + found = set(snapshot.id for snapshot in snapshots) + assert found == set(["cg-doc2", "cg-doc3", "cg-doc4"]) + + query = ( + client.collection_group(collection_group) + .where("__name__", ">", client.document("a/b")) + .where( + "__name__", "<", client.document("a/b/{}/cg-doc3".format(collection_group)) + ) + ) + snapshots = list(query.stream()) + found = set(snapshot.id for snapshot in snapshots) + assert found == set(["cg-doc2"]) + + def test_get_all(client, cleanup): collection_name = "get-all" + unique_resource_id("-") diff --git a/firestore/tests/unit/v1/test_client.py b/firestore/tests/unit/v1/test_client.py index 968d13487249..fb82b1f9d9bb 100644 --- a/firestore/tests/unit/v1/test_client.py +++ b/firestore/tests/unit/v1/test_client.py @@ -130,6 +130,21 @@ def test_collection_factory_nested(self): self.assertIs(collection2._client, client) self.assertIsInstance(collection2, CollectionReference) + def test_collection_group(self): + client = self._make_default_one() + query = client.collection_group("collectionId").where("foo", "==", u"bar") + + assert query._all_descendants + assert query._field_filters[0].field.field_path == "foo" + assert query._field_filters[0].value.string_value == u"bar" + assert query._field_filters[0].op == query._field_filters[0].EQUAL + assert query._parent.id == "collectionId" + + def test_collection_group_no_slashes(self): + client = self._make_default_one() + with self.assertRaises(ValueError): + client.collection_group("foo/bar") + def test_document_factory(self): from google.cloud.firestore_v1.document import DocumentReference @@ -148,7 +163,20 @@ def test_document_factory(self): self.assertIs(document2._client, client) self.assertIsInstance(document2, DocumentReference) - def test_document_factory_nested(self): + def test_document_factory_w_absolute_path(self): + from google.cloud.firestore_v1.document import DocumentReference + + parts = ("rooms", "roomA") + client = self._make_default_one() + doc_path = "/".join(parts) + to_match = client.document(doc_path) + document1 = client.document(to_match._document_path) + + self.assertEqual(document1._path, parts) + self.assertIs(document1._client, client) + self.assertIsInstance(document1, DocumentReference) + + def test_document_factory_w_nested_path(self): from google.cloud.firestore_v1.document import DocumentReference client = self._make_default_one() diff --git a/firestore/tests/unit/v1/test_query.py b/firestore/tests/unit/v1/test_query.py index c67c053c7765..eada98cd192a 100644 --- a/firestore/tests/unit/v1/test_query.py +++ b/firestore/tests/unit/v1/test_query.py @@ -45,8 +45,11 @@ def test_constructor_defaults(self): self.assertIsNone(query._offset) self.assertIsNone(query._start_at) self.assertIsNone(query._end_at) + self.assertFalse(query._all_descendants) - def _make_one_all_fields(self, limit=9876, offset=12, skip_fields=(), parent=None): + def _make_one_all_fields( + self, limit=9876, offset=12, skip_fields=(), parent=None, all_descendants=True + ): kwargs = { "projection": mock.sentinel.projection, "field_filters": mock.sentinel.filters, @@ -55,6 +58,7 @@ def _make_one_all_fields(self, limit=9876, offset=12, skip_fields=(), parent=Non "offset": offset, "start_at": mock.sentinel.start_at, "end_at": mock.sentinel.end_at, + "all_descendants": all_descendants, } for field in skip_fields: kwargs.pop(field) @@ -74,6 +78,7 @@ def test_constructor_explicit(self): self.assertEqual(query._offset, offset) self.assertIs(query._start_at, mock.sentinel.start_at) self.assertIs(query._end_at, mock.sentinel.end_at) + self.assertTrue(query._all_descendants) def test__client_property(self): parent = mock.Mock(_client=mock.sentinel.client, spec=["_client"]) @@ -81,75 +86,79 @@ def test__client_property(self): self.assertIs(query._client, mock.sentinel.client) def test___eq___other_type(self): - client = self._make_one_all_fields() + query = self._make_one_all_fields() other = object() - self.assertFalse(client == other) + self.assertFalse(query == other) def test___eq___different_parent(self): parent = mock.sentinel.parent other_parent = mock.sentinel.other_parent - client = self._make_one_all_fields(parent=parent) + query = self._make_one_all_fields(parent=parent) other = self._make_one_all_fields(parent=other_parent) - self.assertFalse(client == other) + self.assertFalse(query == other) def test___eq___different_projection(self): parent = mock.sentinel.parent - client = self._make_one_all_fields(parent=parent, skip_fields=("projection",)) - client._projection = mock.sentinel.projection + query = self._make_one_all_fields(parent=parent, skip_fields=("projection",)) + query._projection = mock.sentinel.projection other = self._make_one_all_fields(parent=parent, skip_fields=("projection",)) other._projection = mock.sentinel.other_projection - self.assertFalse(client == other) + self.assertFalse(query == other) def test___eq___different_field_filters(self): parent = mock.sentinel.parent - client = self._make_one_all_fields( - parent=parent, skip_fields=("field_filters",) - ) - client._field_filters = mock.sentinel.field_filters + query = self._make_one_all_fields(parent=parent, skip_fields=("field_filters",)) + query._field_filters = mock.sentinel.field_filters other = self._make_one_all_fields(parent=parent, skip_fields=("field_filters",)) other._field_filters = mock.sentinel.other_field_filters - self.assertFalse(client == other) + self.assertFalse(query == other) def test___eq___different_orders(self): parent = mock.sentinel.parent - client = self._make_one_all_fields(parent=parent, skip_fields=("orders",)) - client._orders = mock.sentinel.orders + query = self._make_one_all_fields(parent=parent, skip_fields=("orders",)) + query._orders = mock.sentinel.orders other = self._make_one_all_fields(parent=parent, skip_fields=("orders",)) other._orders = mock.sentinel.other_orders - self.assertFalse(client == other) + self.assertFalse(query == other) def test___eq___different_limit(self): parent = mock.sentinel.parent - client = self._make_one_all_fields(parent=parent, limit=10) + query = self._make_one_all_fields(parent=parent, limit=10) other = self._make_one_all_fields(parent=parent, limit=20) - self.assertFalse(client == other) + self.assertFalse(query == other) def test___eq___different_offset(self): parent = mock.sentinel.parent - client = self._make_one_all_fields(parent=parent, offset=10) + query = self._make_one_all_fields(parent=parent, offset=10) other = self._make_one_all_fields(parent=parent, offset=20) - self.assertFalse(client == other) + self.assertFalse(query == other) def test___eq___different_start_at(self): parent = mock.sentinel.parent - client = self._make_one_all_fields(parent=parent, skip_fields=("start_at",)) - client._start_at = mock.sentinel.start_at + query = self._make_one_all_fields(parent=parent, skip_fields=("start_at",)) + query._start_at = mock.sentinel.start_at other = self._make_one_all_fields(parent=parent, skip_fields=("start_at",)) other._start_at = mock.sentinel.other_start_at - self.assertFalse(client == other) + self.assertFalse(query == other) def test___eq___different_end_at(self): parent = mock.sentinel.parent - client = self._make_one_all_fields(parent=parent, skip_fields=("end_at",)) - client._end_at = mock.sentinel.end_at + query = self._make_one_all_fields(parent=parent, skip_fields=("end_at",)) + query._end_at = mock.sentinel.end_at other = self._make_one_all_fields(parent=parent, skip_fields=("end_at",)) other._end_at = mock.sentinel.other_end_at - self.assertFalse(client == other) + self.assertFalse(query == other) + + def test___eq___different_all_descendants(self): + parent = mock.sentinel.parent + query = self._make_one_all_fields(parent=parent, all_descendants=True) + other = self._make_one_all_fields(parent=parent, all_descendants=False) + self.assertFalse(query == other) def test___eq___hit(self): - client = self._make_one_all_fields() + query = self._make_one_all_fields() other = self._make_one_all_fields() - self.assertTrue(client == other) + self.assertTrue(query == other) def _compare_queries(self, query1, query2, attr_name): attrs1 = query1.__dict__.copy() @@ -181,7 +190,7 @@ def test_select_invalid_path(self): query.select(["*"]) def test_select(self): - query1 = self._make_one_all_fields() + query1 = self._make_one_all_fields(all_descendants=True) field_paths2 = ["foo", "bar"] query2 = query1.select(field_paths2) @@ -213,7 +222,9 @@ def test_where(self): from google.cloud.firestore_v1.proto import document_pb2 from google.cloud.firestore_v1.proto import query_pb2 - query = self._make_one_all_fields(skip_fields=("field_filters",)) + query = self._make_one_all_fields( + skip_fields=("field_filters",), all_descendants=True + ) new_query = query.where("power.level", ">", 9000) self.assertIsNot(query, new_query) @@ -302,7 +313,9 @@ def test_order_by(self): from google.cloud.firestore_v1.gapic import enums klass = self._get_target_class() - query1 = self._make_one_all_fields(skip_fields=("orders",)) + query1 = self._make_one_all_fields( + skip_fields=("orders",), all_descendants=True + ) field_path2 = "a" query2 = query1.order_by(field_path2) @@ -326,7 +339,7 @@ def test_order_by(self): self._compare_queries(query2, query3, "_orders") def test_limit(self): - query1 = self._make_one_all_fields() + query1 = self._make_one_all_fields(all_descendants=True) limit2 = 100 query2 = query1.limit(limit2) @@ -344,7 +357,7 @@ def test_limit(self): self._compare_queries(query2, query3, "_limit") def test_offset(self): - query1 = self._make_one_all_fields() + query1 = self._make_one_all_fields(all_descendants=True) offset2 = 23 query2 = query1.offset(offset2) @@ -382,6 +395,7 @@ def _make_snapshot(docref, values): def test__cursor_helper_w_dict(self): values = {"a": 7, "b": "foo"} query1 = self._make_one(mock.sentinel.parent) + query1._all_descendants = True query2 = query1._cursor_helper(values, True, True) self.assertIs(query2._parent, mock.sentinel.parent) @@ -391,6 +405,7 @@ def test__cursor_helper_w_dict(self): self.assertIsNone(query2._limit) self.assertIsNone(query2._offset) self.assertIsNone(query2._end_at) + self.assertTrue(query2._all_descendants) cursor, before = query2._start_at @@ -468,7 +483,9 @@ def test__cursor_helper_w_snapshot(self): def test_start_at(self): collection = self._make_collection("here") - query1 = self._make_one_all_fields(parent=collection, skip_fields=("orders",)) + query1 = self._make_one_all_fields( + parent=collection, skip_fields=("orders",), all_descendants=True + ) query2 = query1.order_by("hi") document_fields3 = {"hi": "mom"} @@ -1270,6 +1287,47 @@ def test_stream_empty_after_first_response(self): metadata=client._rpc_metadata, ) + def test_stream_w_collection_group(self): + # Create a minimal fake GAPIC. + firestore_api = mock.Mock(spec=["run_query"]) + + # Attach the fake GAPIC to a real client. + client = _make_client() + client._firestore_api_internal = firestore_api + + # Make a **real** collection reference as parent. + parent = client.collection("charles") + other = client.collection("dora") + + # Add two dummy responses to the minimal fake GAPIC. + _, other_prefix = other._parent_info() + name = "{}/bark".format(other_prefix) + data = {"lee": "hoop"} + response_pb1 = _make_query_response(name=name, data=data) + response_pb2 = _make_query_response() + firestore_api.run_query.return_value = iter([response_pb1, response_pb2]) + + # Execute the query and check the response. + query = self._make_one(parent) + query._all_descendants = True + get_response = query.stream() + self.assertIsInstance(get_response, types.GeneratorType) + returned = list(get_response) + self.assertEqual(len(returned), 1) + snapshot = returned[0] + to_match = other.document("bark") + self.assertEqual(snapshot.reference._document_path, to_match._document_path) + self.assertEqual(snapshot.to_dict(), data) + + # Verify the mock call. + parent_path, _ = parent._parent_info() + firestore_api.run_query.assert_called_once_with( + parent_path, + query._to_protobuf(), + transaction=None, + metadata=client._rpc_metadata, + ) + @mock.patch("google.cloud.firestore_v1.query.Watch", autospec=True) def test_on_snapshot(self, watch): query = self._make_one(mock.sentinel.parent) @@ -1537,6 +1595,46 @@ def test_response(self): self.assertEqual(snapshot.update_time, response_pb.document.update_time) +class Test__collection_group_query_response_to_snapshot(unittest.TestCase): + @staticmethod + def _call_fut(response_pb, collection): + from google.cloud.firestore_v1.query import ( + _collection_group_query_response_to_snapshot, + ) + + return _collection_group_query_response_to_snapshot(response_pb, collection) + + def test_empty(self): + response_pb = _make_query_response() + snapshot = self._call_fut(response_pb, None) + self.assertIsNone(snapshot) + + def test_after_offset(self): + skipped_results = 410 + response_pb = _make_query_response(skipped_results=skipped_results) + snapshot = self._call_fut(response_pb, None) + self.assertIsNone(snapshot) + + def test_response(self): + from google.cloud.firestore_v1.document import DocumentSnapshot + + client = _make_client() + collection = client.collection("a", "b", "c") + other_collection = client.collection("a", "b", "d") + to_match = other_collection.document("gigantic") + data = {"a": 901, "b": True} + response_pb = _make_query_response(name=to_match._document_path, data=data) + + snapshot = self._call_fut(response_pb, collection) + self.assertIsInstance(snapshot, DocumentSnapshot) + self.assertEqual(snapshot.reference._document_path, to_match._document_path) + self.assertEqual(snapshot.to_dict(), data) + self.assertTrue(snapshot.exists) + self.assertEqual(snapshot.read_time, response_pb.read_time) + self.assertEqual(snapshot.create_time, response_pb.document.create_time) + self.assertEqual(snapshot.update_time, response_pb.document.update_time) + + def _make_credentials(): import google.auth.credentials diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index acf514775779..d025fa71368f 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -193,12 +193,10 @@ def load(self): if self._leaser is None: return 0 - return max( - [ - self._leaser.message_count / self._flow_control.max_messages, - self._leaser.bytes / self._flow_control.max_bytes, - ] - ) + messages_percent = self._leaser.message_count / self._flow_control.max_messages + bytes_percent = self._leaser.bytes / self._flow_control.max_bytes + print(f"{messages_percent}, {bytes_percent}") + return max(messages_percent, bytes_percent) def add_close_callback(self, callback): """Schedules a callable when the manager closes. @@ -210,10 +208,12 @@ def add_close_callback(self, callback): def maybe_pause_consumer(self): """Check the current load and pause the consumer if needed.""" + print(self.load) if self.load >= 1.0: if self._consumer is not None and not self._consumer.is_paused: _LOGGER.debug("Message backlog over load at %.2f, pausing.", self.load) self._consumer.pause() + print('paused') def maybe_resume_consumer(self): """Check the current load and resume the consumer if needed.""" @@ -227,6 +227,7 @@ def maybe_resume_consumer(self): return if self.load < self.flow_control.resume_threshold: + print('resuming') self._consumer.resume() else: _LOGGER.debug("Did not resume, current load is %s", self.load) diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index e6001f8e7801..80349240fabb 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -65,7 +65,6 @@ def cleanup(): for to_call, argument in registry: to_call(argument) - def test_publish_messages(publisher, topic_path, cleanup): futures = [] # Make sure the topic gets deleted.