diff --git a/framework/src/main/java/org/tron/common/logsfilter/EventPluginLoader.java b/framework/src/main/java/org/tron/common/logsfilter/EventPluginLoader.java index 7896eeffae4..0a7a5ac3a76 100644 --- a/framework/src/main/java/org/tron/common/logsfilter/EventPluginLoader.java +++ b/framework/src/main/java/org/tron/common/logsfilter/EventPluginLoader.java @@ -9,6 +9,8 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; + +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.bouncycastle.util.encoders.Hex; import org.pf4j.CompoundPluginDescriptorFinder; @@ -31,6 +33,8 @@ public class EventPluginLoader { private static EventPluginLoader instance; + private long MAX_PENDING_SIZE = 50000; + private PluginManager pluginManager = null; private List eventListeners; @@ -73,6 +77,7 @@ public class EventPluginLoader { private FilterQuery filterQuery; + @Getter private boolean useNativeQueue = false; public static EventPluginLoader getInstance() { @@ -537,6 +542,21 @@ public void postContractEventTrigger(ContractEventTrigger trigger) { } } + public boolean isBusy() { + if (useNativeQueue) { + return false; + } + int queueSize = 0; + for (IPluginEventListener listener : eventListeners) { + try { + queueSize += listener.getPendingSize(); + } catch (AbstractMethodError error) { + break; + } + } + return queueSize >= MAX_PENDING_SIZE; + } + private String toJsonString(Object data) { String jsonData = ""; diff --git a/framework/src/main/java/org/tron/common/logsfilter/IPluginEventListener.java b/framework/src/main/java/org/tron/common/logsfilter/IPluginEventListener.java index e1ef5b59536..63e23d18842 100644 --- a/framework/src/main/java/org/tron/common/logsfilter/IPluginEventListener.java +++ b/framework/src/main/java/org/tron/common/logsfilter/IPluginEventListener.java @@ -13,6 +13,8 @@ public interface IPluginEventListener extends ExtensionPoint { // start should be called after setServerAddress, setTopic, setDBConfig void start(); + int getPendingSize(); + void handleBlockEvent(Object trigger); void handleTransactionTrigger(Object trigger); diff --git a/framework/src/main/java/org/tron/core/services/event/BlockEventLoad.java b/framework/src/main/java/org/tron/core/services/event/BlockEventLoad.java index 382973ccfa5..75829bf0e5e 100644 --- a/framework/src/main/java/org/tron/core/services/event/BlockEventLoad.java +++ b/framework/src/main/java/org/tron/core/services/event/BlockEventLoad.java @@ -9,6 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.tron.common.es.ExecutorServiceManager; +import org.tron.common.logsfilter.EventPluginLoader; import org.tron.core.db.Manager; import org.tron.core.services.event.bo.BlockEvent; import org.tron.core.services.event.bo.Event; @@ -26,13 +27,19 @@ public class BlockEventLoad { @Autowired private BlockEventGet blockEventGet; + private EventPluginLoader instance = EventPluginLoader.getInstance(); + private final ScheduledExecutorService executor = ExecutorServiceManager .newSingleThreadScheduledExecutor("event-load"); + private long MAX_LOAD_NUM = 100; + public void init() { executor.scheduleWithFixedDelay(() -> { try { - load(); + if (!instance.isBusy()) { + load(); + } } catch (Exception e) { close(); logger.error("Event load service fail.", e); @@ -62,6 +69,9 @@ public synchronized void load() throws Exception { if (cacheHeadNum >= tmpNum) { return; } + if (tmpNum > cacheHeadNum + MAX_LOAD_NUM) { + tmpNum = cacheHeadNum + MAX_LOAD_NUM; + } List l1 = new ArrayList<>(); List l2 = new ArrayList<>(); BlockEvent tmp = BlockEventCache.getHead(); diff --git a/framework/src/main/java/org/tron/core/services/event/HistoryEventService.java b/framework/src/main/java/org/tron/core/services/event/HistoryEventService.java index 12bdbcc3c15..8f79ee47a3c 100644 --- a/framework/src/main/java/org/tron/core/services/event/HistoryEventService.java +++ b/framework/src/main/java/org/tron/core/services/event/HistoryEventService.java @@ -55,12 +55,17 @@ private void syncEvent() { long tmp = instance.getStartSyncBlockNum(); long endNum = manager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum(); while (tmp <= endNum) { + if (instance.isUseNativeQueue()) { + Thread.sleep(20); + } else if (instance.isBusy()) { + Thread.sleep(100); + continue; + } BlockEvent blockEvent = blockEventGet.getBlockEvent(tmp); realtimeEventService.flush(blockEvent, false); solidEventService.flush(blockEvent); tmp++; endNum = manager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum(); - Thread.sleep(30); } initEventService(manager.getChainBaseManager().getBlockIdByNum(endNum)); } catch (InterruptedException e1) { diff --git a/framework/src/test/java/org/tron/core/event/EventPluginLoaderTest.java b/framework/src/test/java/org/tron/core/event/EventPluginLoaderTest.java new file mode 100644 index 00000000000..658d42f38d9 --- /dev/null +++ b/framework/src/test/java/org/tron/core/event/EventPluginLoaderTest.java @@ -0,0 +1,43 @@ +package org.tron.core.event; + +import static org.mockito.Mockito.mock; + +import java.util.ArrayList; +import java.util.List; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.tron.common.logsfilter.EventPluginLoader; +import org.tron.common.logsfilter.IPluginEventListener; +import org.tron.common.utils.ReflectUtils; + +public class EventPluginLoaderTest { + + @Test + public void testIsBusy() { + + EventPluginLoader eventPluginLoader = EventPluginLoader.getInstance(); + ReflectUtils.setFieldValue(eventPluginLoader, "useNativeQueue", true); + boolean flag = eventPluginLoader.isBusy(); + Assert.assertFalse(flag); + + ReflectUtils.setFieldValue(eventPluginLoader, "useNativeQueue", false); + + IPluginEventListener p1 = mock(IPluginEventListener.class); + List list = new ArrayList<>(); + list.add(p1); + ReflectUtils.setFieldValue(eventPluginLoader, "eventListeners", list); + + Mockito.when(p1.getPendingSize()).thenReturn(100); + flag = eventPluginLoader.isBusy(); + Assert.assertFalse(flag); + + Mockito.when(p1.getPendingSize()).thenReturn(60000); + flag = eventPluginLoader.isBusy(); + Assert.assertTrue(flag); + + Mockito.when(p1.getPendingSize()).thenThrow(new AbstractMethodError()); + flag = eventPluginLoader.isBusy(); + Assert.assertFalse(flag); + } +} diff --git a/framework/src/test/java/org/tron/core/event/HistoryEventServiceTest.java b/framework/src/test/java/org/tron/core/event/HistoryEventServiceTest.java index 7a50c8a0c35..49f77ccf597 100644 --- a/framework/src/test/java/org/tron/core/event/HistoryEventServiceTest.java +++ b/framework/src/test/java/org/tron/core/event/HistoryEventServiceTest.java @@ -26,6 +26,9 @@ public class HistoryEventServiceTest { @Test public void test() throws Exception { EventPluginLoader instance = mock(EventPluginLoader.class); + Mockito.when(instance.isUseNativeQueue()).thenReturn(true); + Mockito.when(instance.isUseNativeQueue()).thenReturn(false); + ReflectUtils.setFieldValue(historyEventService, "instance", instance); DynamicPropertiesStore dynamicPropertiesStore = mock(DynamicPropertiesStore.class); @@ -39,6 +42,7 @@ public void test() throws Exception { SolidEventService solidEventService = new SolidEventService(); RealtimeEventService realtimeEventService = new RealtimeEventService(); BlockEventLoad blockEventLoad = new BlockEventLoad(); + ReflectUtils.setFieldValue(blockEventLoad, "instance", instance); ReflectUtils.setFieldValue(historyEventService, "solidEventService", solidEventService); ReflectUtils.setFieldValue(historyEventService, "realtimeEventService", realtimeEventService); @@ -77,11 +81,16 @@ public void test() throws Exception { Mockito.when(chainBaseManager.getBlockIdByNum(1L)) .thenReturn(new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 1)); + Mockito.when(instance.isUseNativeQueue()).thenReturn(true); + Method method1 = historyEventService.getClass().getDeclaredMethod("syncEvent"); method1.setAccessible(true); method1.invoke(historyEventService); + Mockito.when(instance.isUseNativeQueue()).thenReturn(false); + Mockito.when(instance.isBusy()).thenReturn(true); historyEventService.init(); + Thread.sleep(1000); historyEventService.close(); } }