Skip to content

[Feature][RoutineLoad] Support for consuming kafka from the point of time#5832

Merged
morningman merged 1 commit intoapache:masterfrom
morningman:offset_by_time
May 22, 2021
Merged

[Feature][RoutineLoad] Support for consuming kafka from the point of time#5832
morningman merged 1 commit intoapache:masterfrom
morningman:offset_by_time

Conversation

@morningman
Copy link
Contributor

Proposed changes

Support when creating a kafka routine load, start consumption from a specified point in time instead of a specific offset.
eg:

FROM KAFKA
(
    "kafka_broker_list" = "broker1:9092,broker2:9092",
    "kafka_topic" = "my_topic",
    "property.kafka_default_offsets" = "2021-10-10 11:00:00"
);

or

FROM KAFKA
(
    "kafka_broker_list" = "broker1:9092,broker2:9092",
    "kafka_topic" = "my_topic",
    "kafka_partitions" = "0,1,2",
    "kafka_offsets" = "2021-10-10 11:00:00, 2021-10-10 11:00:00, 2021-10-10 12:00:00"
);

This PR also reconstructed the analysis method of properties when creating or altering
routine load jobs, and unified the analysis process in the RoutineLoadDataSourceProperties class.

Types of changes

What types of changes does your code introduce to Doris?
Put an x in the boxes that apply

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation Update (if none of the other choices apply)
  • Code refactor (Modify the code structure, format the code, etc...)

Checklist

Put an x in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code.

  • I have created an issue on (Fix #ISSUE) and described the bug/feature there in detail
  • Compiling and unit tests pass locally with my changes
  • I have added tests that prove my fix is effective or that my feature works
  • If these changes need document changes, I have updated the document
  • Any dependent changes have been merged

Further comments

If this is a relatively large or complex change, kick off the discussion at dev@doris.apache.org by explaining why you chose the solution you did and what alternatives you considered, etc...

@morningman morningman added kind/feature Categorizes issue or PR as related to a new feature. area/load Issues or PRs related to all kinds of load api-review Categorizes an issue or PR as actively needing an API review. labels May 17, 2021
@morningman morningman self-assigned this May 17, 2021
@morningman morningman changed the title [Feature][] Support for consuming kafka from the point of time [Feature][RoutineLoad] Support for consuming kafka from the point of time May 17, 2021
@EmmyMiao87 EmmyMiao87 self-assigned this May 17, 2021

2) OFFSET_END: Subscribe from the end.

3) Timestamp, the format is the same as kafka_offsets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
3) Timestamp, the format is the same as kafka_offsets
3) Timestamp, the format is the same as kafka_offsets

Example:

`"property.kafka_default_offsets" = "OFFSET_BEGINNING"`
`"property.kafka_default_offsets" = "2021-05-11 10:00:00"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`"property.kafka_default_offsets" = "2021-05-11 10:00:00"`
`"property.kafka_default_offsets" = "2021-05-11 10:00:00"`


2) OFFSET_END: Subscribe from the end.

3) Timestamp, the format must be like: "2021-05-11 10:00:00", the system will automatically locate the offset of the first message greater than or equal to the timestamp.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
3) Timestamp, the format must be like: "2021-05-11 10:00:00", the system will automatically locate the offset of the first message greater than or equal to the timestamp.
3) Timestamp, the format must be like: "2021-05-11 10:00:00", the system will automatically locate the offset of the first message greater than or equal to the timestamp.

for (String kafkaOffsetsStr : kafkaOffsetsStringList) {
if (TimeUtils.timeStringToLong(kafkaOffsetsStr) != -1) {
foundTime = true;
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else if (String is number) {}
else { format error}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kafkaOffsetsStr can be "OFFSET_END", "OFFSET_BEGINNING", number, "2020-10-10 00:00:00",
so only check if it is number is not enough.

EmmyMiao87
EmmyMiao87 previously approved these changes May 19, 2021
Copy link
Contributor

@EmmyMiao87 EmmyMiao87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

second

thrid

4

5

6

self review

doc2

fix alter

fix bug

fix ut

fix docs

fix doc
Copy link
Contributor

@EmmyMiao87 EmmyMiao87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@EmmyMiao87 EmmyMiao87 added the approved Indicates a PR has been approved by one committer. label May 20, 2021
@morningman morningman merged commit 07ad038 into apache:master May 22, 2021
stdpain pushed a commit to stdpain/incubator-doris that referenced this pull request Jul 8, 2021
…time (apache#5832)

Support when creating a kafka routine load, start consumption from a specified point in time instead of a specific offset.
eg:
```
FROM KAFKA
(
    "kafka_broker_list" = "broker1:9092,broker2:9092",
    "kafka_topic" = "my_topic",
    "property.kafka_default_offsets" = "2021-10-10 11:00:00"
);

or

FROM KAFKA
(
    "kafka_broker_list" = "broker1:9092,broker2:9092",
    "kafka_topic" = "my_topic",
    "kafka_partitions" = "0,1,2",
    "kafka_offsets" = "2021-10-10 11:00:00, 2021-10-10 11:00:00, 2021-10-10 12:00:00"
);
```

This PR also reconstructed the analysis method of properties when creating or altering
routine load jobs, and unified the analysis process in the `RoutineLoadDataSourceProperties` class.
@morningman morningman mentioned this pull request Oct 10, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api-review Categorizes an issue or PR as actively needing an API review. approved Indicates a PR has been approved by one committer. area/load Issues or PRs related to all kinds of load kind/feature Categorizes issue or PR as related to a new feature.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants