[pulsar-broker] Broker extensions to allow operators of enterprise wide cluster better control and flexibility#12536
Conversation
…trol when operating org-wide messaging platform
…trol when operating org-wide messaging platform
|
@madhavan-narayanan:Thanks for providing doc info! |
|
@madhavan-narayanan:Thanks for providing doc info! |
|
@madhavan-narayanan:Thanks for your contribution. For this PR, do we need to update docs? |
|
@madhavan-narayanan:Thanks for providing doc info! |
| return create(ledgerEntry,null); | ||
| } | ||
|
|
||
| public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor managedLedgerInterceptor) { |
There was a problem hiding this comment.
Put the 'managedLedgerInterceptor' in here seem a little strange.
IMHO, it's better put outside of create, the managedLedgerInterceptor processes the ledgerEntry and then pass the result to this create method.
There was a problem hiding this comment.
I extended the create() to ensure that future usages of this method from other places don't miss out the interceptor.
I do agree that it will be clean to have the calling methods process the ledger entry using interceptors and pass the result.
…et is used to ensure iteration order is same as insertion order
|
@madhavan-narayanan It should be good to start with a proposal for this new feature, I noticed there are new API introduced. For starting a proposal for Pulsar, you can check https://lists.apache.org/thread/m8dr0hz7qn7rkd48bcp430lcq2q3674g |
@codelipenghui , thanks for your comment. I will start a proposal as you suggested. |
| private int newEntriesCheckDelayInMillis = 10; | ||
| private Clock clock = Clock.systemUTC(); | ||
| private ManagedLedgerInterceptor managedLedgerInterceptor; | ||
| private ManagedLedgerInterceptor managedLedgerPayloadProcessor; |
There was a problem hiding this comment.
If we're using the same interface, couldn't we have a single managedLedgerInterceptor instance>
| if (iterator.hasNext()) { | ||
| LedgerEntry ledgerEntry = iterator.next(); | ||
| EntryImpl returnEntry = EntryImpl.create(ledgerEntry); | ||
| EntryImpl returnEntry = EntryImpl.create(ledgerEntry, ml.getConfig().getManagedLedgerPayloadProcessor()); |
There was a problem hiding this comment.
Maybe we can store the interceptor as a member variable of EntryCacheImpl.
| public void consumerCreated(ServerCnx cnx, | ||
| Consumer consumer, | ||
| Map<String, String> metadata) { | ||
| for (BrokerInterceptorWithClassLoader value : interceptors.values()) { |
There was a problem hiding this comment.
I know this was already the style of other methods here, though we should try to avoid the allocation of an iterator instance in the case where there are no interceptors. eg:
if (interceptors.isEmpty()) {
return;
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
// ....
}There was a problem hiding this comment.
+1 please check null or empty first.
| CompletableFuture<Void> promise = new CompletableFuture<>(); | ||
| promise.complete(null); | ||
| return promise; |
There was a problem hiding this comment.
| CompletableFuture<Void> promise = new CompletableFuture<>(); | |
| promise.complete(null); | |
| return promise; | |
| return CompletableFuture.completedFuture(null); |
There was a problem hiding this comment.
N/A with the latest changes. This class doesn't exist anymore
| } | ||
|
|
||
| private static final class MessagePublishContext implements PublishContext, Runnable { | ||
| Map<String, Object> propertyMap = new HashMap<>(); |
There was a problem hiding this comment.
It's not clear why we need to use the property map here and we should try to avoid the allocation if it's not being used.
There was a problem hiding this comment.
Done. propertyMap is created only when needed
| final long consumerId = ack.getConsumerId(); | ||
|
|
||
| if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { | ||
| Consumer consumer = consumerFuture.getNow(null); |
There was a problem hiding this comment.
Since we have the consumer variable here, we can also change the next line to avoid doing getNow() again.
| * @param extraClientHeaders | ||
| * @return | ||
| */ | ||
| PulsarAdminBuilder extraClientHeaders(String extraClientHeaders); |
There was a problem hiding this comment.
I think this change should go in a separate PR since it's a bit different from the payload interceptor change.
Also, it's not clear from API perspective how the extra headers are going to be used and what is the format of that string is.
There was a problem hiding this comment.
N/A anymore. Removed from the changeset
| if(cacheBuf != null && cacheBuf != entry.data) | ||
| entry.data = cacheBuf; | ||
| else | ||
| entry.data.retain(); |
There was a problem hiding this comment.
Instead of checking the equality of the buffer instance, the interceptor should have a well defined semantic on the expectation of the reference count of the returned buffer.
| * @param ledgerData data from ledger | ||
| * @return data after processing, if any | ||
| */ | ||
| default ByteBuf beforeCacheEntryFromLedger(ByteBuf ledgerData){ |
There was a problem hiding this comment.
I think we need to be very explicit here (and careful :) ) on the reference-count expectations here for the buffers, to avoid memory leaks or use-after-destroyed scenarios.
There was a problem hiding this comment.
Agree. The interface is better defined and documented now
| } | ||
|
|
||
| @Override | ||
| public boolean isSuperUser(PulsarService pulsarService, String appId, String originalPrincipal){ |
There was a problem hiding this comment.
I think this would be more suitable to custom authorization provider, where there is already the isSuperUser() method.
There was a problem hiding this comment.
Removed from the changeset
Created a feature request #12752 |
|
@madhavan-narayanan Thanks. |
|
@hangc0276 , @codelipenghui , @merlimat , @Jason918 |
…de cluster better control and flexibility (apache#12536) Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *
…de cluster better control and flexibility (#12536) Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java * (cherry picked from commit 03bbc8e)
…de cluster better control and flexibility (#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java * (cherry picked from commit 03bbc8e)
|
Hi @madhavan-narayanan, thanks for your great contribution! I see you marked this PR with the To figure out what content we should add, we (+@hangc0276) have gone through the PR changes and figured out the general idea (it adds some inceptors to brokers to better process messages), but we do not know the details. Since you're the author, can you please add docs or provide technical inputs? ==================== To help you quickly get started, here are some thoughts on doc inputs:
==================== cc @D-2-Ed |
…de cluster better control and flexibility (apache#12536) ### Motivation Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this ### Modifications - Enhanced org.apache.pulsar.broker.intercept.BrokerInterceptor interface to include additional events for tracing - Created a new interface org.apache.pulsar.common.intercept.MessagePayloadProcessor to allow interception of ledger write/read operations - Enhanced PulsarAdmin to give operators a control in managing super-users ### Verifying this change - [x ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - *Added new test cases to MangedLedgerInterceptorImplTest.java and BrokerInterceptorTest.java *

Motivation
Operators of enterprise Pulsar cluster(s) would need the flexibility and control to intercept broker events (including ledger writes/reads) for template validations, observability and access control. This changeset contains enhancements to existing interceptor mechanism to support this
Modifications
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If
yeswas chosen, please highlight the changesDocumentation
Check the box below and label this PR (if you have committer privilege).
Need to update docs?
doc-required(If you need help on updating docs, create a doc issue)
no-need-doc(Please explain why)
doc(If this PR contains doc changes)