Skip to content

KAFKA-8570: Grow buffer to hold down converted records if it was insufficiently sized#6974

Merged
hachikuji merged 2 commits intoapache:trunkfrom
dhruvilshah3:downconversion-bug
Jun 21, 2019
Merged

KAFKA-8570: Grow buffer to hold down converted records if it was insufficiently sized#6974
hachikuji merged 2 commits intoapache:trunkfrom
dhruvilshah3:downconversion-bug

Conversation

@dhruvilshah3
Copy link
Copy Markdown
Contributor

When the log contains out of order message formats (for example v2 message followed by v1 message) and consists of compressed batches typically greater than 1kB in size, it is possible for down-conversion to fail. With compressed batches, we estimate the size of down-converted batches using:

    private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType) {
        return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
    }

This almost always underestimates size of down-converted records if the batch is between 1kB-64kB in size. In general, this means we may under estimate the total size required for compressed batches.

Because of an implicit assumption in the code that messages with a lower message format appear before any with a higher message format, we do not grow the buffer we copy the down converted records into when we see a message <= the target message format. This assumption becomes incorrect when the log contains out of order message formats, for example because of leaders flapping while upgrading the message format.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dhruvilshah3 . Left one comment.

for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) {
temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes();
if (recordBatchAndRecords.batch.magic() <= toMagic) {
buffer = Utils.ensureCapacity(buffer, buffer.limit() + recordBatchAndRecords.batch.sizeInBytes());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be buffer.position() instead of buffer.limit()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doh, yes. thanks!

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the fix!

@hachikuji
Copy link
Copy Markdown
Contributor

retest this please

@hachikuji
Copy link
Copy Markdown
Contributor

I filed https://issues.apache.org/jira/browse/KAFKA-8577 for the failing test case. I have seen it fail in other PRs, so I think it is not related.

@hachikuji hachikuji merged commit 5f8b289 into apache:trunk Jun 21, 2019
hachikuji pushed a commit that referenced this pull request Jun 21, 2019
…fficiently sized (#6974)

When the log contains out of order message formats (for example v2 message followed by v1 message) and consists of compressed batches typically greater than 1kB in size, it is possible for down-conversion to fail. With compressed batches, we estimate the size of down-converted batches using:

```
    private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType) {
        return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
    }
```

This almost always underestimates size of down-converted records if the batch is between 1kB-64kB in size. In general, this means we may under estimate the total size required for compressed batches.

Because of an implicit assumption in the code that messages with a lower message format appear before any with a higher message format, we do not grow the buffer we copy the down converted records into when we see a message <= the target message format. This assumption becomes incorrect when the log contains out of order message formats, for example because of leaders flapping while upgrading the message format.

Reviewers: Jason Gustafson <jason@confluent.io>
hachikuji pushed a commit that referenced this pull request Jun 21, 2019
…fficiently sized (#6974)

When the log contains out of order message formats (for example v2 message followed by v1 message) and consists of compressed batches typically greater than 1kB in size, it is possible for down-conversion to fail. With compressed batches, we estimate the size of down-converted batches using:

```
    private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType) {
        return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
    }
```

This almost always underestimates size of down-converted records if the batch is between 1kB-64kB in size. In general, this means we may under estimate the total size required for compressed batches.

Because of an implicit assumption in the code that messages with a lower message format appear before any with a higher message format, we do not grow the buffer we copy the down converted records into when we see a message <= the target message format. This assumption becomes incorrect when the log contains out of order message formats, for example because of leaders flapping while upgrading the message format.

Reviewers: Jason Gustafson <jason@confluent.io>
hachikuji pushed a commit that referenced this pull request Jun 21, 2019
…fficiently sized (#6974)

When the log contains out of order message formats (for example v2 message followed by v1 message) and consists of compressed batches typically greater than 1kB in size, it is possible for down-conversion to fail. With compressed batches, we estimate the size of down-converted batches using:

```
    private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType) {
        return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
    }
```

This almost always underestimates size of down-converted records if the batch is between 1kB-64kB in size. In general, this means we may under estimate the total size required for compressed batches.

Because of an implicit assumption in the code that messages with a lower message format appear before any with a higher message format, we do not grow the buffer we copy the down converted records into when we see a message <= the target message format. This assumption becomes incorrect when the log contains out of order message formats, for example because of leaders flapping while upgrading the message format.

Reviewers: Jason Gustafson <jason@confluent.io>
hachikuji pushed a commit that referenced this pull request Jun 21, 2019
…fficiently sized (#6974)

When the log contains out of order message formats (for example v2 message followed by v1 message) and consists of compressed batches typically greater than 1kB in size, it is possible for down-conversion to fail. With compressed batches, we estimate the size of down-converted batches using:

```
    private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType) {
        return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
    }
```

This almost always underestimates size of down-converted records if the batch is between 1kB-64kB in size. In general, this means we may under estimate the total size required for compressed batches.

Because of an implicit assumption in the code that messages with a lower message format appear before any with a higher message format, we do not grow the buffer we copy the down converted records into when we see a message <= the target message format. This assumption becomes incorrect when the log contains out of order message formats, for example because of leaders flapping while upgrading the message format.

Reviewers: Jason Gustafson <jason@confluent.io>
@dhruvilshah3 dhruvilshah3 deleted the downconversion-bug branch July 3, 2019 16:41
hachikuji pushed a commit that referenced this pull request Jul 12, 2019
…fficiently sized (#7071)

Backport #6974 to 1.1

When the log contains out of order message formats (for example v2 message followed by v1 message) and consists of compressed batches typically greater than 1kB in size, it is possible for down-conversion to fail. With compressed batches, we estimate the size of down-converted batches using:

```
    private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType) {
        return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
    }
```

This almost always underestimates size of down-converted records if the batch is between 1kB-64kB in size. In general, this means we may under estimate the total size required for compressed batches.

Because of an implicit assumption in the code that messages with a lower message format appear before any with a higher message format, we do not grow the buffer we copy the down converted records into when we see a message <= the target message format. This assumption becomes incorrect when the log contains out of order message formats, for example because of leaders flapping while upgrading the message format.

Reviewers: Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants