Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Conversation

@Demogorgon314
Copy link
Member

@Demogorgon314 Demogorgon314 commented Nov 23, 2021

Motivation

Currently, the KoP store message logic is in KafkaRequestHandler, to avoid duplicate code and easier to support idempotent produce, we must refactor the message storage logic.

Modifications

  • Refactor message storage logic.

  • The ReplicaManager is a public interface to store messages, we should always use ReplicaManager to store the message in any situation.

  • PartitionLog mapping to Kafka is Log, the reason for using this name is to avoid naming conflict, like: log.debug().

  • PartitionLogManager hold all PartitionLog, key is full partition name, value is PartitionLog.

TODO

  • Add fetch operation

@Demogorgon314 Demogorgon314 self-assigned this Nov 23, 2021
@Demogorgon314 Demogorgon314 marked this pull request as ready for review November 25, 2021 01:27
@BewareMyPower
Copy link
Collaborator

NOTE: I'll go out for a while and the review is not completed. You can address these comments first.

@BewareMyPower
Copy link
Collaborator

LGTM but left some minor comments.

@BewareMyPower BewareMyPower merged commit 1c07bd5 into streamnative:master Nov 28, 2021
@Demogorgon314 Demogorgon314 deleted the split-storage branch December 1, 2021 13:32
BewareMyPower pushed a commit that referenced this pull request Dec 3, 2021
### Motivation

Currently, the KoP store message logic is in `KafkaRequestHandler`, to avoid duplicate code and easier to support idempotent produce, we must refactor the message storage logic.

### Modifications
* Refactor message storage logic.
* The `ReplicaManager` is a public interface to store messages, we should always use `ReplicaManager` to store the message in any situation.

* `PartitionLog` mapping to Kafka is `Log`, the reason for using this name is to avoid naming conflict, like: `log.debug()`.

* `PartitionLogManager` hold all `PartitionLog`, key is full partition name, value is `PartitionLog`.

### TODO
* Add fetch operation
BewareMyPower pushed a commit that referenced this pull request Dec 3, 2021
Currently, the KoP store message logic is in `KafkaRequestHandler`, to avoid duplicate code and easier to support idempotent produce, we must refactor the message storage logic.

* Refactor message storage logic.
* The `ReplicaManager` is a public interface to store messages, we should always use `ReplicaManager` to store the message in any situation.

* `PartitionLog` mapping to Kafka is `Log`, the reason for using this name is to avoid naming conflict, like: `log.debug()`.

* `PartitionLogManager` hold all `PartitionLog`, key is full partition name, value is `PartitionLog`.

* Add fetch operation

Fix conflicts made by #819.

The `path` method was removed from master. Keep this method in
branch-2.8.1.
eolivelli pushed a commit to eolivelli/kop that referenced this pull request Dec 17, 2021
…e#925)

Currently, the KoP store message logic is in `KafkaRequestHandler`, to avoid duplicate code and easier to support idempotent produce, we must refactor the message storage logic.

* Refactor message storage logic.
* The `ReplicaManager` is a public interface to store messages, we should always use `ReplicaManager` to store the message in any situation.

* `PartitionLog` mapping to Kafka is `Log`, the reason for using this name is to avoid naming conflict, like: `log.debug()`.

* `PartitionLogManager` hold all `PartitionLog`, key is full partition name, value is `PartitionLog`.

* Add fetch operation

(cherry picked from commit 1c07bd5)

authorize(AclOperation.WRITE, Resource.of(ResourceType.TOPIC, fullPartitionName))
.whenComplete((isAuthorized, ex) -> {
if (ex != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems those two if can combine together. Like

if (ex != null || !isAuthorized) {
    if (ex != null) {
        log.error("Write topic authorize failed, topic - {}. {}",
                fullPartitionName, ex.getMessage());
    }
    unauthorizedTopicResponsesMap.put(topicPartition,
            new ProduceResponse.PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED));
    completeOne.run();
    return;
}

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants