Skip to content

Commit 690f96b

Browse files
committed
Revert "allow providing stream start ts (#162)"
This reverts commit 4efd958.
1 parent 1f3b757 commit 690f96b

File tree

2 files changed

+7
-42
lines changed

2 files changed

+7
-42
lines changed

fauna/client/client.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,13 @@ class StreamOptions:
5656
5757
* max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
5858
* max_backoff - The maximum backoff in seconds for an individual retry.
59-
* start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after
60-
the timestamp.
6159
* status_events - Indicates if stream should include status events. Status events are periodic events that
62-
update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
63-
about the cost of maintaining the stream other than the cost of the received events.
60+
* update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
61+
* about about the cost of maintaining the stream other than the cost of the received events.
6462
"""
6563

6664
max_attempts: Optional[int] = None
6765
max_backoff: Optional[int] = None
68-
start_ts: Optional[int] = None
6966
status_events: bool = False
7067

7168

@@ -553,8 +550,6 @@ def _create_stream(self):
553550
data: Dict[str, Any] = {"token": self._token.token}
554551
if self.last_ts is not None:
555552
data["start_ts"] = self.last_ts
556-
elif self._opts.start_ts is not None:
557-
data["start_ts"] = self._opts.start_ts
558553

559554
return self._http_client.stream(
560555
url=self._endpoint, headers=self._headers, data=data)

tests/integration/test_stream.py

Lines changed: 5 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import threading
22
import time
3+
from datetime import timedelta
4+
35
import pytest
46
import httpx
57
import fauna
8+
69
from fauna import fql
710
from fauna.client import Client, StreamOptions
811
from fauna.http.httpx_client import HTTPXClient
@@ -74,6 +77,8 @@ def thread_fn():
7477
def test_error_on_stream(scoped_client):
7578
scoped_client.query(fql("Collection.create({name: 'Product'})"))
7679

80+
events = []
81+
7782
def thread_fn():
7883
stream = scoped_client.stream(fql("Product.all().map(.foo / 0).toStream()"))
7984

@@ -166,41 +171,6 @@ def thread_fn():
166171
assert events == ["add", "add", "add"]
167172

168173

169-
def test_providing_start_ts(scoped_client):
170-
scoped_client.query(fql("Collection.create({name: 'Product'})"))
171-
172-
stream_token = scoped_client.query(fql("Product.all().toStream()")).data
173-
174-
createOne = scoped_client.query(fql("Product.create({})"))
175-
createTwo = scoped_client.query(fql("Product.create({})"))
176-
createThree = scoped_client.query(fql("Product.create({})"))
177-
178-
events = []
179-
180-
def thread_fn():
181-
# replay excludes the ts that was passed in, it provides events for all ts after the one provided
182-
stream = scoped_client.stream(stream_token,
183-
StreamOptions(start_ts=createOne.txn_ts))
184-
with stream as iter:
185-
for event in iter:
186-
events.append(event)
187-
if (len(events) == 3):
188-
iter.close()
189-
190-
stream_thread = threading.Thread(target=thread_fn)
191-
stream_thread.start()
192-
193-
# adds a delay so the thread can open the stream,
194-
# otherwise we could miss some events
195-
time.sleep(0.5)
196-
197-
createFour = scoped_client.query(fql("Product.create({})"))
198-
stream_thread.join()
199-
assert events[0]["txn_ts"] == createTwo.txn_ts
200-
assert events[1]["txn_ts"] == createThree.txn_ts
201-
assert events[2]["txn_ts"] == createFour.txn_ts
202-
203-
204174
@pytest.mark.xfail(reason="not currently supported by core")
205175
def test_handle_status_events(scoped_client):
206176
scoped_client.query(fql("Collection.create({name: 'Product'})"))

0 commit comments

Comments
 (0)