Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.bouncycastle.util.encoders.Hex;
import org.pf4j.CompoundPluginDescriptorFinder;
Expand All @@ -31,6 +33,8 @@ public class EventPluginLoader {

private static EventPluginLoader instance;

private long MAX_PENDING_SIZE = 50000;

private PluginManager pluginManager = null;

private List<IPluginEventListener> eventListeners;
Expand Down Expand Up @@ -73,6 +77,7 @@ public class EventPluginLoader {

private FilterQuery filterQuery;

@Getter
private boolean useNativeQueue = false;

public static EventPluginLoader getInstance() {
Expand Down Expand Up @@ -537,6 +542,21 @@ public void postContractEventTrigger(ContractEventTrigger trigger) {
}
}

public boolean isBusy() {
if (useNativeQueue) {
return false;
}
int queueSize = 0;
for (IPluginEventListener listener : eventListeners) {
try {
queueSize += listener.getPendingSize();
} catch (AbstractMethodError error) {
break;
}
}
return queueSize >= MAX_PENDING_SIZE;
}

private String toJsonString(Object data) {
String jsonData = "";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public interface IPluginEventListener extends ExtensionPoint {
// start should be called after setServerAddress, setTopic, setDBConfig
void start();

int getPendingSize();

void handleBlockEvent(Object trigger);

void handleTransactionTrigger(Object trigger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.logsfilter.EventPluginLoader;
import org.tron.core.db.Manager;
import org.tron.core.services.event.bo.BlockEvent;
import org.tron.core.services.event.bo.Event;
Expand All @@ -26,13 +27,19 @@ public class BlockEventLoad {
@Autowired
private BlockEventGet blockEventGet;

private EventPluginLoader instance = EventPluginLoader.getInstance();

private final ScheduledExecutorService executor = ExecutorServiceManager
.newSingleThreadScheduledExecutor("event-load");

private long MAX_LOAD_NUM = 100;

public void init() {
executor.scheduleWithFixedDelay(() -> {
try {
load();
if (!instance.isBusy()) {
load();
}
} catch (Exception e) {
close();
logger.error("Event load service fail.", e);
Expand Down Expand Up @@ -62,6 +69,9 @@ public synchronized void load() throws Exception {
if (cacheHeadNum >= tmpNum) {
return;
}
if (tmpNum > cacheHeadNum + MAX_LOAD_NUM) {
tmpNum = cacheHeadNum + MAX_LOAD_NUM;
}
List<BlockEvent> l1 = new ArrayList<>();
List<BlockEvent> l2 = new ArrayList<>();
BlockEvent tmp = BlockEventCache.getHead();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,17 @@ private void syncEvent() {
long tmp = instance.getStartSyncBlockNum();
long endNum = manager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum();
while (tmp <= endNum) {
if (instance.isUseNativeQueue()) {
Thread.sleep(20);
} else if (instance.isBusy()) {
Thread.sleep(100);
continue;
}
BlockEvent blockEvent = blockEventGet.getBlockEvent(tmp);
realtimeEventService.flush(blockEvent, false);
solidEventService.flush(blockEvent);
tmp++;
endNum = manager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum();
Thread.sleep(30);
}
initEventService(manager.getChainBaseManager().getBlockIdByNum(endNum));
} catch (InterruptedException e1) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.tron.core.event;

import static org.mockito.Mockito.mock;

import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.tron.common.logsfilter.EventPluginLoader;
import org.tron.common.logsfilter.IPluginEventListener;
import org.tron.common.utils.ReflectUtils;

public class EventPluginLoaderTest {

@Test
public void testIsBusy() {

EventPluginLoader eventPluginLoader = EventPluginLoader.getInstance();
ReflectUtils.setFieldValue(eventPluginLoader, "useNativeQueue", true);
boolean flag = eventPluginLoader.isBusy();
Assert.assertFalse(flag);

ReflectUtils.setFieldValue(eventPluginLoader, "useNativeQueue", false);

IPluginEventListener p1 = mock(IPluginEventListener.class);
List<IPluginEventListener> list = new ArrayList<>();
list.add(p1);
ReflectUtils.setFieldValue(eventPluginLoader, "eventListeners", list);

Mockito.when(p1.getPendingSize()).thenReturn(100);
flag = eventPluginLoader.isBusy();
Assert.assertFalse(flag);

Mockito.when(p1.getPendingSize()).thenReturn(60000);
flag = eventPluginLoader.isBusy();
Assert.assertTrue(flag);

Mockito.when(p1.getPendingSize()).thenThrow(new AbstractMethodError());
flag = eventPluginLoader.isBusy();
Assert.assertFalse(flag);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public class HistoryEventServiceTest {
@Test
public void test() throws Exception {
EventPluginLoader instance = mock(EventPluginLoader.class);
Mockito.when(instance.isUseNativeQueue()).thenReturn(true);
Mockito.when(instance.isUseNativeQueue()).thenReturn(false);

ReflectUtils.setFieldValue(historyEventService, "instance", instance);

DynamicPropertiesStore dynamicPropertiesStore = mock(DynamicPropertiesStore.class);
Expand All @@ -39,6 +42,7 @@ public void test() throws Exception {
SolidEventService solidEventService = new SolidEventService();
RealtimeEventService realtimeEventService = new RealtimeEventService();
BlockEventLoad blockEventLoad = new BlockEventLoad();
ReflectUtils.setFieldValue(blockEventLoad, "instance", instance);

ReflectUtils.setFieldValue(historyEventService, "solidEventService", solidEventService);
ReflectUtils.setFieldValue(historyEventService, "realtimeEventService", realtimeEventService);
Expand Down Expand Up @@ -77,11 +81,16 @@ public void test() throws Exception {
Mockito.when(chainBaseManager.getBlockIdByNum(1L))
.thenReturn(new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 1));

Mockito.when(instance.isUseNativeQueue()).thenReturn(true);

Method method1 = historyEventService.getClass().getDeclaredMethod("syncEvent");
method1.setAccessible(true);
method1.invoke(historyEventService);

Mockito.when(instance.isUseNativeQueue()).thenReturn(false);
Mockito.when(instance.isBusy()).thenReturn(true);
historyEventService.init();
Thread.sleep(1000);
historyEventService.close();
}
}