Skip to content

Kafka: Reordered segment allocation causes spurious failures #5761

@gianm

Description

@gianm

In one of our test clusters we saw Kafka index task failures corresponding to sequence_name_prev_id_sha1 unique constraint violations on the overlord. The query getting run was this one:

            handle.createStatement(
                StringUtils.format(
                    "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) "
                    + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)",
                    dbTables.getPendingSegmentsTable(), connector.getQuoteString()
                )
            )
                  .bind("id", newIdentifier.getIdentifierAsString())
                  .bind("dataSource", dataSource)
                  .bind("created_date", DateTimes.nowUtc().toString())
                  .bind("start", interval.getStart().toString())
                  .bind("end", interval.getEnd().toString())
                  .bind("sequence_name", sequenceName)
                  .bind("sequence_prev_id", previousSegmentIdNotNull)
                  .bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1)
                  .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier))
                  .execute();

The constraint violation happens when the (sequence_name, previousSegmentIdNotNull) tuple is not unique. So in particular, it happens when the segment sequence "forks" and goes from the same segment X into two subsequent segments Y and Z. And from looking at the tasks involved, the events that occurred looked something like this:

  1. A Kafka index task allocated some segments and then failed for some reason.
  2. A new task was started up to re-read from the same offset.
  3. The new task also tried to allocate segments, and it worked for a while, but eventually it tried to allocate one in a different order than the original task. This fails and so does the whole task.
  4. Another task starts up, but it fails for the same reason.
  5. The cycle of violence continues ad infinitum.

I think the problem is that the allocation fails if you don't do them in the same order as they were originally done in, but, since #4815 it's no longer guaranteed that two Kafka index tasks, starting from the same partition/offsets, will create segments in the same order (due to the mixing of messages from different partitions being non-deterministic). I believe this is meant to be okay, since the hypothetical two tasks would still end up at the same place, due to the restriction that there is just one segment per interval now.

(Aside: if there were more than one segment per interval, the tasks would not be guaranteed to end up at the same place, since they wouldn't necessarily make the same decision about which rows to put in which segments for the same interval.)

So I am thinking we should:

  1. Confirm that there is meant to only be one segment per interval per sequence now, and add sanity checks to enforce this if they do not already exist.
  2. Confirm there is no longer any need for the previous-segment-id check.
  3. Remove the check: allow a sequence to allocate segments out of order, subject to there being at most one per interval per segment.

/cc @pjain1 @jihoonson

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions