Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions tests/kafkatest/services/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,10 @@ def __init__(self, cc, name="verifiable-source", tasks=1, topic="verifiable", th
self.throughput = throughput

def committed_messages(self):
return filter(lambda m: 'committed' in m and m['committed'], self.messages())
return list(filter(lambda m: 'committed' in m and m['committed'], self.messages()))

def sent_messages(self):
return filter(lambda m: 'committed' not in m or not m['committed'], self.messages())
return list(filter(lambda m: 'committed' not in m or not m['committed'], self.messages()))

def start(self):
self.logger.info("Creating connector VerifiableSourceConnector %s", self.name)
Expand All @@ -467,10 +467,10 @@ def __init__(self, cc, name="verifiable-sink", tasks=1, topics=["verifiable"]):
self.topics = topics

def flushed_messages(self):
return filter(lambda m: 'flushed' in m and m['flushed'], self.messages())
return list(filter(lambda m: 'flushed' in m and m['flushed'], self.messages()))

def received_messages(self):
return filter(lambda m: 'flushed' not in m or not m['flushed'], self.messages())
return list(filter(lambda m: 'flushed' not in m or not m['flushed'], self.messages()))

def start(self):
self.logger.info("Creating connector VerifiableSinkConnector %s", self.name)
Expand Down
14 changes: 7 additions & 7 deletions tests/kafkatest/tests/connect/connect_distributed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ def test_pause_and_resume_source(self, connect_protocol):
err_msg="Failed to see connector transition to the PAUSED state")

# verify that we do not produce new messages while paused
num_messages = len(list(self.source.sent_messages()))
num_messages = len(self.source.sent_messages())
time.sleep(10)
assert num_messages == len(list(self.source.sent_messages())), "Paused source connector should not produce any messages"
assert num_messages == len(self.source.sent_messages()), "Paused source connector should not produce any messages"

self.cc.resume_connector(self.source.name)

Expand All @@ -239,7 +239,7 @@ def test_pause_and_resume_source(self, connect_protocol):
err_msg="Failed to see connector transition to the RUNNING state")

# after resuming, we should see records produced again
wait_until(lambda: len(list(self.source.sent_messages())) > num_messages, timeout_sec=30,
wait_until(lambda: len(self.source.sent_messages()) > num_messages, timeout_sec=30,
err_msg="Failed to produce messages after resuming source connector")

@cluster(num_nodes=5)
Expand All @@ -259,7 +259,7 @@ def test_pause_and_resume_sink(self, connect_protocol):
self.source = VerifiableSource(self.cc, topic=self.TOPIC)
self.source.start()

wait_until(lambda: len(list(self.source.committed_messages())) > 0, timeout_sec=30,
wait_until(lambda: len(self.source.committed_messages()) > 0, timeout_sec=30,
err_msg="Timeout expired waiting for source task to produce a message")

self.sink = VerifiableSink(self.cc, topics=[self.TOPIC])
Expand All @@ -276,9 +276,9 @@ def test_pause_and_resume_sink(self, connect_protocol):
err_msg="Failed to see connector transition to the PAUSED state")

# verify that we do not consume new messages while paused
num_messages = len(list(self.sink.received_messages()))
num_messages = len(self.sink.received_messages())
time.sleep(10)
assert num_messages == len(list(self.sink.received_messages())), "Paused sink connector should not consume any messages"
assert num_messages == len(self.sink.received_messages()), "Paused sink connector should not consume any messages"

self.cc.resume_connector(self.sink.name)

Expand All @@ -287,7 +287,7 @@ def test_pause_and_resume_sink(self, connect_protocol):
err_msg="Failed to see connector transition to the RUNNING state")

# after resuming, we should see records consumed again
wait_until(lambda: len(list(self.sink.received_messages())) > num_messages, timeout_sec=30,
wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30,
err_msg="Failed to consume messages after resuming sink connector")

@cluster(num_nodes=5)
Expand Down