Skip to content

Conversation

@bryanck
Copy link
Contributor

@bryanck bryanck commented Jul 27, 2024

This PR waits for the coordinator thread to complete when stopping the sink, to ensure there are no coordinator tasks pending before starting a new coordinator. The sink was designed with the assumption that only one coordinator and its tasks are active at a given time.

This also changes throwing RuntimeException to ConnectException to be a little bit more specific.

@ajantha-bhat
Copy link
Member

cc: @fqaiser94, you might be interested in the review.

try {
join(60_000);
} catch (InterruptedException e) {
throw new ConnectException("Timed out waiting for coordinator thread to terminate", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] can we add in the log line how much time we waited ? or the exception message contains it already

terminated = true;

try {
join(60_000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] does it makes sense to make it configurable ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered that but felt it was config overkill, and we have hardcoded thread wait timeouts elsewhere like the REST catalog. If this times out then the task should be terminated and restarted by the KC framework. I could add it if you think it would be useful, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Coordinator is doing the commit and based on the table properties on commit retries it can take longer for ex: https://iceberg.apache.org/docs/nightly/configuration/#table-behavior-properties
so it might make sense to make it configurable to me but in past we have seen we have gone ahead with static timeouts like 10 min's check this commit 25eaeba#diff-6ee62ee6bfe801211a8725679859de808af3689a36bb5784342ab4977c7828d0R201

but 1 minutes seems to be too low in this case tbh, can we bump this to like a large number instead if we don't want to make it configurable, do you whats the max timeout of these thread wait timeouts elsewhere like the REST catalog ?

Copy link
Contributor Author

@bryanck bryanck Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually need to rethink this a little, join() won't throw an exception when it hits the timeout. As an aside, the REST catalog timeout is also 1 min here.

Copy link
Contributor

@fqaiser94 fqaiser94 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the ping @ajantha-bhat

Still think it would be better to support conditional-commits in iceberg to solve the actual underlying of problem of duplicated-file-appends, see comment for more details.

terminated = true;

try {
join(60_000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR; I have no objections to this PR, this is still an incremental improvement.


Just pointing out that this doesn't by any means guarantee that 2 coordinators cannot be running for the same connector at the same time.
IIRC, Kafka Connect will wait 5 seconds (by default, configurable via task.shutdown.graceful.timeout.ms) for a Task to stop.
If a Task does not stop within that time, Kafka Connect will happily abandon that Task and move on to creating a new Task to replace it.
So waiting 60 seconds here is fairly irrelevant; we will really only wait task.shutdown.graceful.timeout.ms which as previously mentioned is only 5 seconds by default.

I've said it before; there is no guaranteed way to prevent 2 Coordinators from running at the same time (unless you want to build a flink-like runtime on top of kafka-connect or bring in external locking solutions, neither of which I recommend).
In my (unfortunate) experience, solutions like this are only ever band-aids that fail at sufficient scale.
A better solution would be to fence zombie data file appends to iceberg tables (currently, we only fence zombie writes to the Kafka control topic currently).
The best way to do that would be to add support conditional-commit-style semantics to iceberg, something like this: #6513

In the meantime however, this is the best we can do 👍

Copy link
Contributor Author

@bryanck bryanck Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @fqaiser94 , I will think this through some more and review your PR for conditional commits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud here, I'm considering using the consumer group generation ID to fence "zombie" coordinators, similar to how Kafka implements zombie fencing. For example, store the generation ID when we start the coordinator and assert that generation ID matches the current before we commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @fqaiser94 , I will ... review your PR for conditional commits.

Thanks, I would appreciate any feedback.
It's been hard to get attention on #6513

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud here, I'm considering using the consumer group generation ID to fence "zombie" coordinators, similar to how Kafka implements zombie fencing. For example, store the generation ID when we start the coordinator and assert that generation ID matches the current before we commit.

I've considered this before (and several, more complex variants of the idea) but sadly there are still race conditions where you would end up with duplicate file appends. The fundamental problem is that there is always a possibility that after you've checked the generation-id matches the current-generation-id but before you've committed to iceberg, another zombie process comes around and commits the same files. In that sense, it's really no different than what we're doing today in the Coordinator with our table-committed-offsets check 🤷

There are ways to close those race conditions but it still requires conditional commit support from iceberg to implement the fencing. And at that point, we don't really need to compare consumer-group-generation-ids. All we really need to do is assert at commit time that the committed table offsets have not changed since we last checked them. Not before, not after, at commit time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what we could do in kafka-connect-iceberg if iceberg supported conditional commits: 4bff041

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like making an incremental improvement is still worthwhile while changes to Iceberg core are considered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely; that why I approved this PR.

@bryanck
Copy link
Contributor Author

bryanck commented Jul 29, 2024

Thanks for the reviews everyone, I'm going to close this and open a new one after giving it some more thought.

@bryanck bryanck closed this Jul 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants