Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def __init__(self, **configs):
self._manager.start()

# Bootstrap on __init__
self._manager.run(self._manager.bootstrap(timeout_ms=self.config['bootstrap_timeout_ms']))
self._manager.run(self._manager.bootstrap, self.config['bootstrap_timeout_ms'])
self._closed = False
self._controller_id = None
self._coordinator_cache = {} # {group_id: node_id}
Expand Down
47 changes: 21 additions & 26 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,13 @@ def offsets_by_times(self, timestamps, timeout_ms=None):
Raises:
KafkaTimeoutError if timeout_ms provided
"""
offsets = self._fetch_offsets_by_times(timestamps, timeout_ms)
offsets = self._client._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms)
for tp in timestamps:
if tp not in offsets:
offsets[tp] = None
return offsets

def _fetch_offsets_by_times(self, timestamps, timeout_ms=None):
async def _fetch_offsets_by_times_async(self, timestamps, timeout_ms=None):
if not timestamps:
return {}

Expand All @@ -239,35 +239,30 @@ def _fetch_offsets_by_times(self, timestamps, timeout_ms=None):
return {}

future = self._send_list_offsets_requests(timestamps)
self._client.poll(future=future, timeout_ms=timer.timeout_ms)

# Timeout w/o future completion
if not future.is_done:
try:
offsets, retry = await self._client._manager.wait_for(future, timer.timeout_ms)
except Errors.KafkaTimeoutError:
break

if future.succeeded():
offsets, retry = future.value
except Exception as exc:
if not getattr(exc, 'retriable', False):
raise
if getattr(exc, 'invalid_metadata', False) or self._client._manager.cluster.need_update:
refresh_future = self._client._manager.update_metadata()
try:
await self._client._manager.wait_for(refresh_future, timer.timeout_ms)
except Errors.KafkaTimeoutError:
break
else:
delay = self.config['retry_backoff_ms'] / 1000
if timer.timeout_ms is not None:
delay = min(delay, timer.timeout_ms / 1000)
await self._client._manager._net.sleep(delay)
else:
fetched_offsets.update(offsets)
if not retry:
return fetched_offsets

timestamps = {tp: timestamps[tp] for tp in retry}

elif not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type

elif future.exception.invalid_metadata or self._client.cluster.need_update:
refresh_future = self._client.cluster.request_update()
self._client.poll(future=refresh_future, timeout_ms=timer.timeout_ms)

if not future.is_done:
break
else:
if timer.timeout_ms is None or timer.timeout_ms > self.config['retry_backoff_ms']:
time.sleep(self.config['retry_backoff_ms'] / 1000)
else:
time.sleep(timer.timeout_ms / 1000)

timer.maybe_raise()

raise Errors.KafkaTimeoutError(
Expand All @@ -283,7 +278,7 @@ def end_offsets(self, partitions, timeout_ms):

def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
timestamps = dict([(tp, timestamp) for tp in partitions])
offsets = self._fetch_offsets_by_times(timestamps, timeout_ms)
offsets = self._client._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms)
for tp in timestamps:
offsets[tp] = offsets[tp].offset
return offsets
Expand Down
11 changes: 8 additions & 3 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,14 @@ def __init__(self, *topics, **configs):
self._metrics = None

self._client = self.config['kafka_client'](metrics=self._metrics, **self.config)

# Get auto-discovered / normalized version from client
self.config['api_version'] = self._client.get_broker_version(timeout_ms=self.config['api_version_auto_timeout_ms'])
self._manager = self._client._manager

# If api_version was not passed explicitly, bootstrap to auto-discover
# it. bootstrap is passed as a deferred coroutine so that once the IO
# thread is introduced in a later phase it runs on the IO thread.
if self._manager.broker_version_data is None:
self._manager.run(self._manager.bootstrap, self.config['api_version_auto_timeout_ms'])
self.config['api_version'] = self._manager.broker_version

# Coordinator configurations are different for older brokers
# max_poll_interval_ms is not supported directly -- it must the be
Expand Down
32 changes: 32 additions & 0 deletions kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,38 @@ def stop(self, timeout=None):
def poll(self, timeout_ms=None, future=None):
return self._net.poll(timeout_ms=timeout_ms, future=future)

async def wait_for(self, future, timeout_ms):
"""Await `future` with a timeout in ms. Raises KafkaTimeoutError on timeout.

Must be awaited from a coroutine running on this loop. The underlying
future is not cancelled on timeout — it continues to run; the timeout
only unblocks the awaiter.
"""
if timeout_ms is None:
return await future
wrapper = Future()
def _on_success(value):
if not wrapper.is_done:
wrapper.success(value)
def _on_failure(exc):
if not wrapper.is_done:
wrapper.failure(exc)
future.add_callback(_on_success)
future.add_errback(_on_failure)
def _on_timeout():
if not wrapper.is_done:
wrapper.failure(Errors.KafkaTimeoutError(
'Timed out after %s ms' % timeout_ms))
timer = self._net.call_later(timeout_ms / 1000, _on_timeout)
try:
return await wrapper
finally:
if not timer.is_done:
try:
self._net.unschedule(timer)
except ValueError:
pass

async def _invoke(self, coro, args):
"""Invoke coro/awaitable/function and fully resolve the result.

Expand Down
Loading
Loading