Skip to content

[fix][broker] Execute per-topic entry filters with the same classloader#19364

Merged
nicoloboschi merged 3 commits intoapache:masterfrom
nicoloboschi:fix-entry-filters
Feb 1, 2023
Merged

[fix][broker] Execute per-topic entry filters with the same classloader#19364
nicoloboschi merged 3 commits intoapache:masterfrom
nicoloboschi:fix-entry-filters

Conversation

@nicoloboschi
Copy link
Contributor

@nicoloboschi nicoloboschi commented Jan 30, 2023

Fixes #19361

Motivation

Currently the entry filters per-topic/per-namespace are recreated from scratch (search in fs, unzip nar, classloader creation) every time a topic with entry filters enabled is created/updated.
This leads to multiple issues:

  1. The topic creation path is affected by the time for looking up the file system and the unpacking (cpu intensive)
  2. Each topic has his own NarClassLoader instance that has a relevant memory footprint impact

Modifications

  1. Made the EntryFilterProvider a stateful class. In the constructor all the entry filter definitions and NarClassLoader are loaded, together with the broker entry filters
  2. When the topic is created, using the same NarClassLoader a new EntryFilterWithClassLoader instance is created.

One different thing from before is that now the same entry filter class is loaded once. This means if a entry filter relies on static variables, those point to the same memory address for each EntryFilter invocation, even for different topics.
I think it's acceptable because:

  1. This feature has never been released so it's not a real breaking change
  2. The behaviour of EntryFilter is consistent with the broker ones. For the developer this means to not have to think to different contexts based on how the filter is deployed.

Other things in the pull request:

  1. Moved the entryFilter update in AbstractTopic to the initialize method
  2. Improved the entry filters override logic where, if the broker denied to override the broker entry filters but not filters were configured, then the topic's filter was used. This is wrong since the Pulsar admin doesn't expect any filter to be executed if allowOverrideEntryFilters is false
  3. Added test about execution ordering of entry filters, if more than one. This is very important since it can cause performance regression based on the chain of entry filters configured.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jan 30, 2023
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work @nicoloboschi . I have a comment about EntryFilterWithClassLoader.

}
}

public List<EntryFilterWithClassLoader> loadEntryFiltersForPolicy(EntryFilters policy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original implementation, EntryFilterWithClassLoader is unnecessarily used as a top level concept. I think that this is a bad approach. It's sufficient to use EntryFilter.

Please refactor the solution to hide EntryFilterWithClassLoader. It's possible that it could be removed completely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we need to keep a reference to the NarClassLoader, this is a pattern we have in other plugins that can be loaded from a Nar file

Comment on lines 494 to 530
EntryFilterWithClassLoader mockFilterReject = mock(EntryFilterWithClassLoader.class);
when(mockFilterReject.filterEntry(any(Entry.class), any(FilterContext.class))).thenReturn(
EntryFilter.FilterResult.REJECT);
EntryFilterWithClassLoader mockFilterAccept = mock(EntryFilterWithClassLoader.class);
when(mockFilterAccept.filterEntry(any(Entry.class), any(FilterContext.class))).thenReturn(
EntryFilter.FilterResult.ACCEPT);
if (overrideBrokerEntryFilters) {
setMockFilterToTopic((PersistentTopic) pulsar.getBrokerService()
.getTopicReference(topic).get(), List.of(mockFilterReject, mockFilterAccept));
} else {
setMockFilterToTopic((PersistentTopic) pulsar.getBrokerService()
.getTopicReference(topic).get(), List.of());
setMockBrokerFilter(List.of(mockFilterReject, mockFilterAccept));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we need to get rid of this type of mocking. It's not that this PR has introduced this approach.

It would be useful it the EntryFilterProvider could be overridden in tests. It becomes easier if EntryFilterProvider is a simple interface and the existing EntryFilterProvider is renamed to EntryFilterProviderImpl or DefaultEntryFilterProvider.

Instead of injecting fields and doing other hacks, filters could simply be tested by passing an EntryFilterProvider in tests.

@lhotari
Copy link
Member

lhotari commented Jan 31, 2023

Currently the entry filters per-topic/per-namespace are recreated from scratch (search in fs, unzip nar, classloader creation) every time a topic with entry filters enabled is created/updated.

One clarification to this: this applies to all entry filters in the master branch. The master branch has conflicting changes, but it will also load per-broker filters using the same logic as per-topic filters. The filters just won't get used at all unless allowOverrideEntryFilters=true.

The broker config level value gets set to the policies here:

topicPolicies.getEntryFilters().updateBrokerValue(new EntryFilters(String.join(",",
config.getEntryFilterNames())));

Since #19361 applies to all entry filter usage, I'd suggest renaming the title of this PR.

Comment on lines 146 to 161
private void setMockFilterToTopic(PersistentTopic topicRef, List<EntryFilterWithClassLoader> mockFilter) throws NoSuchFieldException, IllegalAccessException {
Field field = topicRef.getClass().getSuperclass().getDeclaredField("entryFilters");
field.setAccessible(true);
field.set(topicRef, mockFilter);
}

private void setMockBrokerFilter(List<EntryFilterWithClassLoader> mockFilter) throws NoSuchFieldException, IllegalAccessException {
Field field2 = pulsar.getBrokerService().getEntryFilterProvider()
.getClass().getDeclaredField("brokerEntryFilters");
field2.setAccessible(true);
field2.set(pulsar.getBrokerService().getEntryFilterProvider(), mockFilter);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope that there's a way to get rid of these as explained in my other comment.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hangc0276 please take a look

}
}

public List<EntryFilterWithClassLoader> loadEntryFiltersForPolicy(EntryFilters policy)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we need to keep a reference to the NarClassLoader, this is a pattern we have in other plugins that can be loaded from a Nar file

@lhotari
Copy link
Member

lhotari commented Jan 31, 2023

Unfortunately we need to keep a reference to the NarClassLoader, this is a pattern we have in other plugins that can be loaded from a Nar file

@eolivelli That can be achieved without making EntryFilterWithClassLoader the top level type. The top level type should be simply EntryFilter. It's an implementation detail that doesn't need to be exposed if there happens to be such behavior that the NarClassloader gets closed when the entry filter gets closed.

@nicoloboschi
Copy link
Contributor Author

@lhotari @eolivelli I've updated the pull:

  • moved the internal entriesFilter update on policies change callback
  • moved the top level class to EntryFilter
  • created a mock version of EntryFilterProvider. unfortunately the test class is not using ´TestPulsarService´ (yet [improve][test] Migrate tests to use PulsarTestContext #19376) so I had to inject it in the brokerService.

For the test part, I'm planning to improve the mocking part after #19376 gets merged, does it make sense to you @lhotari ?

@lhotari
Copy link
Member

lhotari commented Jan 31, 2023

For the test part, I'm planning to improve the mocking part after #19376 gets merged, does it make sense to you @lhotari ?

@nicoloboschi sounds good!

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work @nicoloboschi

@nicoloboschi
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work

I left some additional comments

});
} catch (PulsarServerException e) {
log.warn("Failed to create topic {}-{}", topic, e.getMessage());
} catch (Throwable e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching Throwable is usually a code smell (and you should always deal with InterruptedException unfortunately.

Maybe we can move to RuntimeException here


this.updateResourceGroupLimiter(Optional.of(data));
final Optional<Policies> optionalData = Optional.of(data);
this.updateResourceGroupLimiter(optionalData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change seems unrelated

builder.put(filterName, filter);
}
log.info("Successfully loaded entry filter for name `{}`", filterName);
final EntryFilter filterWithClassLoader = load(metaData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to "entryFilter"

}

private static String classLoaderKey(Path archivePath) {
return archivePath.toFile().getAbsolutePath();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is archivePath.toFile().getAbsolutePath() potentially performing some IO operation ? (metadata access?)

probably archivePath.toString() is enough in this context


@Slf4j
@ToString
public class EntryFilterWithClassLoader implements EntryFilter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this class ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's the implementation of EntryFilter

@VisibleForTesting
protected Map<String, EntryFilterMetaData> definitions;
@VisibleForTesting
protected Map<String, NarClassLoader> cachedClassLoaders;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it required that we refer to NarClassLoader here ? can it be simply a "ClassLoader" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no because we need to get the ServiceDefinition file from the classloader, so a NarClassLoader is required

meta.setDefinition(def);
meta.setArchivePath(Path.of(name));
definitions.put(name, meta);
final NarClassLoader ncl = mock(NarClassLoader.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if cachedClassLoaders is not required to old " NarClassLoader" entries that maybe this code can be simplified a lot.
and you can create a new URLClassloader with Thread.currentThread().getContextClassLoader() as parent
and so we continue to remove mocks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my previous comment

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm

@nicoloboschi nicoloboschi merged commit a4c3034 into apache:master Feb 1, 2023
@nicoloboschi nicoloboschi added this to the 3.0.0 milestone Feb 1, 2023
@nicoloboschi nicoloboschi self-assigned this Feb 1, 2023
BewareMyPower added a commit to BewareMyPower/kop that referenced this pull request Mar 6, 2023
### Modifications

- Upgrade the Pulsar dependency to 3.0.0.1-SNAPSHOT
- Use `List<EntryFilter>` from
  apache/pulsar#19364 as the filters
- Remove the removed APIs in `AuthorizationService`
- Use `TopicType` instead of String.
BewareMyPower added a commit to BewareMyPower/kop that referenced this pull request Mar 6, 2023
### Modifications

- Upgrade the Pulsar dependency to 3.0.0.1-SNAPSHOT
- Use `List<EntryFilter>` from
  apache/pulsar#19364 as the filters
- Remove the removed APIs in `AuthorizationService`
- Use `TopicType` instead of String.
Demogorgon314 added a commit to streamnative/kop that referenced this pull request Mar 7, 2023
### Modifications

- Upgrade the Pulsar dependency to 3.0.0.1-SNAPSHOT
- Use `List<EntryFilter>` from
apache/pulsar#19364 as the filters
- Remove the removed APIs in `AuthorizationService`
- Use `TopicType` instead of String.
- Fix authenticator failure caused by
apache/pulsar#19295

Co-authored-by: Demogorgon314 <kwang@streamnative.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs ready-to-test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Entry filters per topic/namespace load and unpack NAR files for each topic created

3 participants