Skip to content

[fix][io] Make record properties configurable for kinesis source#24495

Merged
shibd merged 2 commits intoapache:masterfrom
shibd:make_kinesis
Jul 10, 2025
Merged

[fix][io] Make record properties configurable for kinesis source#24495
shibd merged 2 commits intoapache:masterfrom
shibd:make_kinesis

Conversation

@shibd
Copy link
Copy Markdown
Member

@shibd shibd commented Jul 9, 2025

Motivation

The Kinesis source connector's handling of metadata properties was rigid and contained a critical bug.

  • Properties were not configurable: Users could not select which Kinesis metadata properties to include in Pulsar messages. This forced all properties to be included, which could be inefficient in terms of message size.

  • Key Collision Bug: A bug was discovered where all property keys were defined as an empty string (""), causing a key collision that resulted in the loss of all metadata except for the last one set (sequenceNumber).

This PR fixes these issues by introducing a configuration to control which properties are included, which also resolves the underlying bug.

Modifications

  • Added a new configuration, kinesisRecordProperties, to KinesisSourceConfig.java. This allows users to provide a comma-separated list of properties to include.
  • The default value retains all previously available properties to ensure backward compatibility.
  • As part of making the properties configurable, the empty string constants in KinesisRecord.java were replaced with unique, descriptive keys (e.g., "kinesis.arrival.timestamp"), fixing the data loss bug.

Verifying this change

  • Add unit test to covert this change

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jul 9, 2025

Codecov Report

❌ Patch coverage is 76.92308% with 6 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.28%. Comparing base (bbc6224) to head (42fc078).
⚠️ Report is 1311 commits behind head on master.

Files with missing lines Patch % Lines
...ache/pulsar/io/kinesis/KinesisRecordProcessor.java 0.00% 5 Missing ⚠️
.../apache/pulsar/io/kinesis/KinesisSourceConfig.java 87.50% 0 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24495      +/-   ##
============================================
+ Coverage     73.57%   74.28%   +0.71%     
- Complexity    32624    32843     +219     
============================================
  Files          1877     1868       -9     
  Lines        139502   145902    +6400     
  Branches      15299    16728    +1429     
============================================
+ Hits         102638   108390    +5752     
- Misses        28908    28914       +6     
- Partials       7956     8598     +642     
Flag Coverage Δ
inttests 26.71% <ø> (+2.13%) ⬆️
systests 23.33% <0.00%> (-0.99%) ⬇️
unittests 73.77% <76.92%> (+0.92%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...va/org/apache/pulsar/io/kinesis/KinesisRecord.java 73.52% <100.00%> (+73.52%) ⬆️
.../apache/pulsar/io/kinesis/KinesisSourceConfig.java 27.50% <87.50%> (-9.76%) ⬇️
...ache/pulsar/io/kinesis/KinesisRecordProcessor.java 0.00% <0.00%> (ø)

... and 1091 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@RobertIndie RobertIndie requested a review from Copilot July 10, 2025 02:20
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR makes Kinesis metadata properties configurable and fixes a key-collision bug by giving each property a unique key. It updates the source configuration, record processor, and record class to honor a user-defined list of properties, and adds unit tests for the new behavior.

  • Introduce kinesisRecordProperties config to select which metadata properties to include
  • Replace empty-string constants in KinesisRecord with descriptive keys and conditionally set them
  • Update processor and config loading logic, and add tests for default, custom, and empty property lists

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
KinesisSourceConfig.java Add kinesisRecordProperties field, parse it into a propertiesToInclude set
KinesisRecordProcessor.java Store and pass propertiesToInclude into each KinesisRecord; update logging
KinesisRecord.java Replace empty constants with descriptive keys; conditionally populate properties
KinesisSourceConfigTests.java Add tests for default, custom, and empty property configurations
KinesisRecordTest.java Add tests covering all/some/none property inclusion scenarios
Comments suppressed due to low confidence (1)

pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java:176

  • The default properties test only verifies the total count and two keys; consider asserting all six default properties are present to ensure full coverage.
        assertEquals(properties.size(), 6);

@shibd shibd merged commit e0efcbb into apache:master Jul 10, 2025
51 checks passed
shibd added a commit that referenced this pull request Jul 11, 2025
codelipenghui pushed a commit to codelipenghui/incubator-pulsar that referenced this pull request Jul 15, 2025
@lhotari
Copy link
Copy Markdown
Member

lhotari commented Jul 17, 2025

btw. #21004 was never cherry-picked to branch-3.0. I'll cherry-pick that to branch-3.0 since there are too many merge conflicts without making that change.

@lhotari
Copy link
Copy Markdown
Member

lhotari commented Jul 17, 2025

btw. #21004 was never cherry-picked to branch-3.0. I'll cherry-pick that to branch-3.0 since there are too many merge conflicts without making that change.

Skipping this cherry-picking since @shibd removed the release label for 3.0.x .

priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 22, 2025
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 24, 2025
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants