Skip to content

Implement IngesterAffinity broadcast#6152

Merged
nadav-govari merged 8 commits intonadav/feature-node-based-routingfrom
nadav/node-affinity-broadcast
Feb 17, 2026
Merged

Implement IngesterAffinity broadcast#6152
nadav-govari merged 8 commits intonadav/feature-node-based-routingfrom
nadav/node-affinity-broadcast

Conversation

@nadav-govari
Copy link
Collaborator

@nadav-govari nadav-govari commented Feb 11, 2026

Background

Main idea: https://docs.google.com/document/d/1XUpBdMFnuX8d23erK-XwQkomRgbeRTJ0TJtve7RGW3k/edit?tab=t.0.

All work on this feature will be merged PR by PR into the base branch nadav/feature-node-based-routing, which will then eventually be merged into main once it's fully ready.

PR Description

Creates a new broadcast to prepare for node based routing. The idea is described more in depth in

The primary thinking here is:

  • Ingester affinity score for receiving new requests. This will be used in a weighted power of two choices comparison against other nodes. The node with the higher affinity score wins and receives the request for persistence.
  • The number of open shards for the individual index can act as a tiebreaker.
    • This isn't perfect but we can iterate on it
      Ingesters will move away from keeping shard level data, and instead keep this node level data for routing requests. Routing tables will move to be node based and use the data from these broadcasts to update their routing tables.

@nadav-govari nadav-govari changed the base branch from main to nadav/feature-node-based-routing February 11, 2026 21:12
pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:";

/// Key used in chitchat to broadcast the ingester affinity score and open shard counts.
pub const INGESTER_AFFINITY_PREFIX: &str = "ingester.affinity";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already use the word affinity for searchers split affinity. I think we can find another ok name for this metric that we don't use already.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, how's ingester capacity? As in, literally the capacity of the ingester to ingest new requests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed the task to BroadcastIngesterCapacity and all references from affinity to capacity.


pub type OpenShardCounts = Vec<(IndexUid, SourceId, usize)>;

const WAL_CAPACITY_LOOKBACK_WINDOW_LEN: usize = 6;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could use a comment. I assume you had a duration in mind for that window and then divided by BROADCAST_INTERVAL_PERIOD to get to 6. What's that window duration?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding. It was meant to be 30 seconds.


struct WalCapacityTimeSeries {
wal_capacity: ByteSize,
readings: VecDeque<ByteSize>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a better implementation of a timeserie based on a rotating time window in broadcast. This is a common pattern. So, move the og implementation in common. Abstractify enough so it can be used for both uses cases, import and use it here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LocalShardUpdate and BroadcastIngesterCapacity now both use this new RingBuffer, which is in quickwit-common.

}

impl WalCapacityTimeSeries {
fn new(wal_capacity: ByteSize) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mem or disk? the name should say it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disk, modified.

return None;
}
let oldest = if self.readings.len() > WAL_CAPACITY_LOOKBACK_WINDOW_LEN {
self.readings.pop_back().unwrap()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use expect and state the invariant/conditation that allow you to call expect safely:
.expect("window should not be empty")
.expect("window should have more than 1 measurements")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted, though this isn't relevant any longer with the RingBuffer changes.

.weak_state
.upgrade()
.context("ingester state has been dropped")?;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just lock the whole thing fully and make the code more readable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Ok(snapshot) => snapshot,
Err(error) => {
error!("failed to snapshot ingester state: {error}");
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WAL can take multiple BROADCAST_INTERVAL_PERIOD intervals to load. The task should not not stop when we're loading the WAL, only if the state is dropped.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to the following cases:

  1. State dropped: error, stop task
  2. Ingester not initialized: no-op
  3. Ingester ready: happy path

let value = serde_json::to_string(affinity)
.expect("`IngesterAffinity` should be JSON serializable");
self.cluster
.set_self_key_value(INGESTER_AFFINITY_PREFIX, value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't broadcast that over a single key because the open shard counts can be very long.
-> one key per index/source

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(The value length is an issue because chitchat uses UDP and every update must fit in a single datagram (MTU))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it similar to LocalShardsUpdate, one key per index/source.

pub fn get_open_shard_counts(&self) -> Vec<(IndexUid, SourceId, usize)> {
self.shards
.values()
.filter(|shard| shard.is_open())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.filter(|shard| shard.is_open())
.filter(|shard| shard.is_advertisable && !shard.is_replica() && shard.is_open())

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took it.

self.buffer.last().copied()
}

pub fn oldest(&self) -> Option<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn oldest(&self) -> Option<T> {
pub fn front(&self) -> Option<T> {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

impl<T: Copy + Default, const N: usize> RingBuffer<T, N> {
pub fn push(&mut self, value: T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn push(&mut self, value: T) {
pub fn push_back(&mut self, value: T) {

Let's just copy (half of) the VecDeque API.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/// Elements are stored in a flat array of size `N` and rotated on each push.
/// The newest element is always at position `N - 1` (the last slot), and the
/// oldest is at position `N - len`.
pub struct RingBuffer<T: Copy + Default, const N: usize> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noice

readings: RingBuffer<ByteSize, WAL_CAPACITY_READINGS_LEN>,
}

impl WalDiskCapacityTimeSeries {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we discussed using memory?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, now that I realize they're capped in the chart, I think they're functionally the same, but memory feels like a cleaner number to read. So I switched it to memory.

/// Elements are stored in a flat array of size `N` and rotated on each push.
/// The newest element is always at position `N - 1` (the last slot), and the
/// oldest is at position `N - len`.
pub struct RingBuffer<T: Copy + Default, const N: usize> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude can easily make push O(1), right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it can :)

pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:";

/// Prefix used in chitchat to broadcast per-source ingester capacity scores and open shard counts.
pub const INGESTER_CAPACITY_PREFIX: &str = "ingester.capacity:";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub const INGESTER_CAPACITY_PREFIX: &str = "ingester.capacity:";
pub const INGESTER_CAPACITY_SCORE_PREFIX: &str = "ingester.capacity_score:";

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -0,0 +1,457 @@
// Copyright 2021-Present Datadog, Inc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use capacity_score everywhere.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/// Takes a snapshot of the primary shards hosted by the ingester at regular intervals and
/// broadcasts it to other nodes via Chitchat.
pub(super) struct BroadcastLocalShardsTask {
pub struct BroadcastLocalShardsTask {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub struct BroadcastLocalShardsTask {
pub(crate) struct BroadcastLocalShardsTask {

@nadav-govari nadav-govari merged commit 76cfc84 into nadav/feature-node-based-routing Feb 17, 2026
10 of 14 checks passed
@nadav-govari nadav-govari deleted the nadav/node-affinity-broadcast branch February 17, 2026 16:45
nadav-govari added a commit that referenced this pull request Mar 16, 2026
* Implement IngesterCapacityScore broadcast (#6152)

* Implement node based routing table (#6159)

* Use new node based routing table for routing decisions (#6163)

* Piggyback routing update on persist response (#6173)

* Remove unused shard_ids in persist protos (#6169)

* Add availability zone awareness to node based routing (#6189)

* Remove old routing table; Take both disk and memory WAL readings (#6193)

* Add az-aware ingest attempts metric (#6194)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants