Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ private void cacheEvictionTask() {
double evictionFrequency = Math.max(Math.min(config.getCacheEvictionFrequency(), 1000.0), 0.001);
long waitTimeMillis = (long) (1000 / evictionFrequency);

while (true) {
while (!closed) {
try {
doCacheEviction();

Expand Down Expand Up @@ -514,6 +514,7 @@ public CompletableFuture<Void> shutdownAsync() throws ManagedLedgerException {

statsTask.cancel(true);
flushCursorsTask.cancel(true);
cacheEvictionExecutor.shutdownNow();

List<String> ledgerNames = new ArrayList<>(this.ledgers.keySet());
List<CompletableFuture<Void>> futures = new ArrayList<>(ledgerNames.size());
Expand Down Expand Up @@ -594,7 +595,6 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}));
}
}));
cacheEvictionExecutor.shutdownNow();
entryCacheManager.clear();
return FutureUtil.waitForAll(futures);
}
Expand All @@ -608,6 +608,7 @@ public void shutdown() throws InterruptedException, ManagedLedgerException {

statsTask.cancel(true);
flushCursorsTask.cancel(true);
cacheEvictionExecutor.shutdownNow();

// take a snapshot of ledgers currently in the map to prevent race conditions
List<CompletableFuture<ManagedLedgerImpl>> ledgers = new ArrayList<>(this.ledgers.values());
Expand Down Expand Up @@ -651,7 +652,6 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}

scheduledExecutor.shutdownNow();
cacheEvictionExecutor.shutdownNow();

entryCacheManager.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,13 @@ void setup() throws Exception {
void tearDown() throws Exception {
try {
for (int i = 0; i < BROKER_COUNT; i++) {
if (pulsarServices[i] != null) {
pulsarServices[i].close();
}
if (pulsarAdmins[i] != null) {
pulsarAdmins[i].close();
}
if (pulsarServices[i] != null) {
pulsarServices[i].close();
}

}
bkEnsemble.stop();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ void shutdown() throws Exception {
log.info("--- Shutting down ---");
try {
functionAdmin.close();
bkEnsemble.stop();
workerServer.stop();
functionsWorkerService.stop();
workerServer.stop();
bkEnsemble.stop();
} finally {
if (tempDirectory != null) {
tempDirectory.delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,10 +707,10 @@ public Long answer(InvocationOnMock invocationOnMock) throws Throwable {
when(PulsarConnectorCache.instance.getManagedLedgerFactory()).thenReturn(managedLedgerFactory);

for (Map.Entry<TopicName, PulsarSplit> split : splits.entrySet()) {
PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(
PulsarRecordCursor pulsarRecordCursor = new PulsarRecordCursor(
topicsToColumnHandles.get(split.getKey()), split.getValue(),
pulsarConnectorConfig, managedLedgerFactory, new ManagedLedgerConfig(),
new PulsarConnectorMetricsTracker(new NullStatsProvider()),dispatchingRowDecoderFactory));
new PulsarConnectorMetricsTracker(new NullStatsProvider()),dispatchingRowDecoderFactory);
this.pulsarRecordCursors.put(split.getKey(), pulsarRecordCursor);
}
}
Expand Down