Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import org.bitrepository.common.settings.Settings;
import org.bitrepository.common.utils.SettingsUtils;
import org.bitrepository.common.utils.TimeUtils;
import org.bitrepository.common.utils.XmlUtils;
import org.bitrepository.service.AlarmDispatcher;
import org.bitrepository.settings.repositorysettings.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -52,7 +54,7 @@ public class AuditTrailCollector {
/**
* Initial grace period in milliseconds after startup to allow the system to finish startup.
*/
private static final int DEFAULT_GRACE_PERIOD = 0;
private static final Duration DEFAULT_GRACE_PERIOD = Duration.ZERO;

/**
* @param settings The settings for this collector.
Expand All @@ -70,7 +72,10 @@ public AuditTrailCollector(Settings settings, AuditTrailClient client, AuditTrai

this.settings = settings;
this.timer = new Timer(true);
long collectionInterval = settings.getReferenceSettings().getAuditTrailServiceSettings().getCollectAuditInterval();
javax.xml.datatype.Duration collectAuditInterval =
Comment thread
ole-v-v marked this conversation as resolved.
settings.getReferenceSettings().getAuditTrailServiceSettings().getCollectAuditInterval();
Duration collectionInterval = XmlUtils.xmlDurationToDuration(collectAuditInterval);
long collectionIntervalMillis = collectionInterval.toMillis();

for (Collection c : settings.getRepositorySettings().getCollections().getCollection()) {
IncrementalCollector collector = new IncrementalCollector(c.getID(),
Expand All @@ -79,11 +84,11 @@ public AuditTrailCollector(Settings settings, AuditTrailClient client, AuditTrai
SettingsUtils.getMaxClientPageSize(),
alarmDispatcher);
AuditTrailCollectionTimerTask collectorTask = new AuditTrailCollectionTimerTask(
collector, collectionInterval, getGracePeriod());
collector, collectionIntervalMillis, Math.toIntExact(getGracePeriod().toMillis()));
log.info("Will start collection of audit trail every " +
TimeUtils.millisecondsToHuman(collectionInterval) + ", " +
"after a grace period of " + TimeUtils.millisecondsToHuman(getGracePeriod()));
timer.scheduleAtFixedRate(collectorTask, getGracePeriod(), collectionInterval / 10);
TimeUtils.durationToHuman(collectionInterval) + " " +
"after a grace period of " + TimeUtils.durationToHuman(getGracePeriod()));
timer.scheduleAtFixedRate(collectorTask, getGracePeriod().toMillis(), collectionIntervalMillis / 10);
collectorTasks.put(c.getID(), collectorTask);
}
}
Expand Down Expand Up @@ -124,9 +129,11 @@ public void collectNewestAudits(String collectionID) {
* @return The time to wait before starting collection of audit trails. This enables the system to have time to
* finish startup before they have to start delivering/process audit trails.
*/
private int getGracePeriod() {
private Duration getGracePeriod() {
if (settings.getReferenceSettings().getAuditTrailServiceSettings().isSetGracePeriod()) {
return settings.getReferenceSettings().getAuditTrailServiceSettings().getGracePeriod().intValue();
javax.xml.datatype.Duration gracePeriod =
settings.getReferenceSettings().getAuditTrailServiceSettings().getGracePeriod();
return XmlUtils.xmlDurationToDuration(gracePeriod);
} else {
return DEFAULT_GRACE_PERIOD;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.bitrepository.common.utils.FileUtils;
import org.bitrepository.common.utils.SettingsUtils;
import org.bitrepository.common.utils.TimeUtils;
import org.bitrepository.common.utils.XmlUtils;
import org.bitrepository.modify.putfile.BlockingPutFileClient;
import org.bitrepository.modify.putfile.PutFileClient;
import org.bitrepository.protocol.CoordinationLayerException;
Expand All @@ -50,10 +51,13 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;

Expand Down Expand Up @@ -119,13 +123,14 @@ public void start() {
log.debug("Cancelling old timer.");
timer.cancel();
}
long preservationInterval = preservationSettings.getAuditTrailPreservationInterval();
long timerCheckInterval = preservationInterval / 10;
javax.xml.datatype.Duration preservationIntervalXmlDur = preservationSettings.getAuditTrailPreservationInterval();
Duration preservationInterval = XmlUtils.xmlDurationToDuration(preservationIntervalXmlDur);
long timerCheckIntervalMillis = preservationInterval.dividedBy(10).toMillis();
log.info("Instantiating the preservation of audit trails every {}",
TimeUtils.millisecondsToHuman(preservationInterval));
TimeUtils.durationToHuman(preservationInterval));
timer = new Timer(true);
preservationTask = new AuditPreservationTimerTask(preservationInterval);
timer.scheduleAtFixedRate(preservationTask, timerCheckInterval, timerCheckInterval);
preservationTask = new AuditPreservationTimerTask(preservationInterval.toMillis());
timer.scheduleAtFixedRate(preservationTask, timerCheckIntervalMillis, timerCheckIntervalMillis);
}

@Override
Expand Down Expand Up @@ -264,6 +269,7 @@ private class AuditPreservationTimerTask extends TimerTask {
/**
* @param interval The interval between running this timer task.
*/
// TODO: Replace old time representation (https://sbforge.org/jira/browse/BITMAG-1180)
private AuditPreservationTimerTask(long interval) {
this.schedule = new TimerTaskSchedule(interval, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import javax.xml.datatype.DatatypeFactory;
import java.util.concurrent.ThreadFactory;

import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -74,11 +75,14 @@ public void setup() throws Exception {
@Test(groups = {"unstable"})
public void auditTrailServiceTest() throws Exception {
addDescription("Test the Audit Trail Service");
DatatypeFactory factory = DatatypeFactory.newInstance();
settings.getRepositorySettings().getGetAuditTrailSettings().getNonPillarContributorIDs().clear();
settings.getRepositorySettings().getGetAuditTrailSettings().getNonPillarContributorIDs().add(DEFAULT_CONTRIBUTOR);
settings.getReferenceSettings().getAuditTrailServiceSettings().setCollectAuditInterval(800);
settings.getReferenceSettings().getAuditTrailServiceSettings()
.setCollectAuditInterval(factory.newDuration(800));
settings.getReferenceSettings().getAuditTrailServiceSettings().setTimerTaskCheckInterval(100L);
settings.getReferenceSettings().getAuditTrailServiceSettings().setGracePeriod(800L);
settings.getReferenceSettings().getAuditTrailServiceSettings()
.setGracePeriod(factory.newDuration(800));

AuditTrailStore store = mock(AuditTrailStore.class);
AuditTrailClient client = mock(AuditTrailClient.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import javax.xml.datatype.DatatypeFactory;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
Expand All @@ -65,11 +67,12 @@ public void setup() throws Exception {
@Test(groups = {"regressiontest"})
public void auditCollectorIntervalTest() throws Exception {
addDescription("Test that the collector calls the AuditClient at the correct intervals.");
DatatypeFactory factory = DatatypeFactory.newInstance();
settings.getRepositorySettings().getGetAuditTrailSettings().getNonPillarContributorIDs().clear();
settings.getRepositorySettings().getGetAuditTrailSettings().getNonPillarContributorIDs().add(DEFAULT_CONTRIBUTOR);
settings.getReferenceSettings().getAuditTrailServiceSettings().setCollectAuditInterval(800);
settings.getReferenceSettings().getAuditTrailServiceSettings().setCollectAuditInterval(factory.newDuration(800));
settings.getReferenceSettings().getAuditTrailServiceSettings().setTimerTaskCheckInterval(100L);
settings.getReferenceSettings().getAuditTrailServiceSettings().setGracePeriod(800L);
settings.getReferenceSettings().getAuditTrailServiceSettings().setGracePeriod(factory.newDuration(800));

SettingsUtils.initialize(settings);
AuditTrailClient client = mock(AuditTrailClient.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.Duration;
import java.io.FileInputStream;
import java.net.URL;
import java.sql.Date;
Expand Down Expand Up @@ -90,7 +92,9 @@ public void auditPreservationSchedulingTest() throws Exception {
MockPutClient client = new MockPutClient();

settings.getReferenceSettings().getAuditTrailServiceSettings().setTimerTaskCheckInterval(100);
settings.getReferenceSettings().getAuditTrailServiceSettings().getAuditTrailPreservation().setAuditTrailPreservationInterval(300);
Duration interval = DatatypeFactory.newInstance().newDuration(300);
settings.getReferenceSettings().getAuditTrailServiceSettings().getAuditTrailPreservation()
.setAuditTrailPreservationInterval(interval);
settings.getRepositorySettings().getGetAuditTrailSettings().getNonPillarContributorIDs().clear();
settings.getRepositorySettings().getGetAuditTrailSettings().getNonPillarContributorIDs().add(PILLAR_ID);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
import org.bitrepository.client.eventhandler.OperationFailedEvent;
import org.bitrepository.common.DefaultThreadFactory;
import org.bitrepository.common.settings.Settings;
import org.bitrepository.common.utils.TimeUtils;
import org.bitrepository.common.utils.XmlUtils;
import org.bitrepository.protocol.MessageContext;
import org.bitrepository.protocol.messagebus.MessageBus;
import org.bitrepository.protocol.messagebus.MessageBusManager;
import org.bitrepository.protocol.security.SecurityManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -66,8 +70,9 @@ public class CollectionBasedConversationMediator implements ConversationMediator
@Override
public void start() {
messagebus.addListener(settings.getReceiverDestinationID(), this);
cleanTimer.scheduleAtFixedRate(new ConversationCleaner(), 0,
settings.getReferenceSettings().getClientSettings().getMediatorCleanupInterval().longValue());
javax.xml.datatype.Duration cleanupInterval = settings.getReferenceSettings().getClientSettings().getMediatorCleanupInterval();
cleanTimer.scheduleAtFixedRate(new ConversationCleaner(),
0, XmlUtils.xmlDurationToMilliseconds(cleanupInterval));
}

@Override
Expand Down Expand Up @@ -136,7 +141,7 @@ public void onMessage(Message message, MessageContext messageContext) {
}

/**
* Will clean out obsolete conversations in each run. An obsolete conversation is a conversation which satisfies any
* Will clean out obsolete conversations in each run. An obsolete conversation is a conversation which satisfies one
* of the following criteria: <ol>
* <li> Returns true for the <code>hasEnded()</code> method.
* <li> Is older than the conversationTimeout limit allows.
Expand All @@ -149,15 +154,20 @@ private final class ConversationCleaner extends TimerTask {
@Override
public void run() {
Conversation[] conversationArray = conversations.values().toArray(new Conversation[0]);
long currentTime = System.currentTimeMillis();
Duration conversationTimeout = XmlUtils.xmlDurationToDuration(
settings.getReferenceSettings().getClientSettings().getConversationTimeout());
Instant currentTime = Instant.now();
for (Conversation conversation : conversationArray) {
if (conversation.hasEnded()) {
conversations.remove(conversation.getConversationID());
} else if (currentTime - conversation.getStartTime() >
settings.getReferenceSettings().getClientSettings().getConversationTimeout().longValue()) {
log.warn("Failing timed out conversation " + conversation.getConversationID() + " " + "(Age " +
(currentTime - conversation.getStartTime()) + "ms)");
failConversation(conversation, "Failing timed out conversation " + conversation.getConversationID());
} else {
Instant startTime = Instant.ofEpochMilli(conversation.getStartTime());
Comment thread
Bohlski marked this conversation as resolved.
Instant expirationTime = startTime.plus(conversationTimeout);
if (expirationTime.isBefore(currentTime)) {
log.warn("Failing timed out conversation {} (Age: {})", conversation.getConversationID(),
TimeUtils.durationToHuman(Duration.between(startTime, currentTime)));
failConversation(conversation, "Failing timed out conversation " + conversation.getConversationID());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,8 @@ public void conversationTimeout() throws Exception {

settingsForCUT.getRepositorySettings().getCollections().getCollection().get(0).getPillarIDs().getPillarID().clear();
settingsForCUT.getRepositorySettings().getCollections().getCollection().get(0).getPillarIDs().getPillarID().add(PILLAR1_ID);
settingsForCUT.getReferenceSettings().getClientSettings().setConversationTimeout(BigInteger.valueOf(100));
DatatypeFactory factory = DatatypeFactory.newInstance();
settingsForCUT.getReferenceSettings().getClientSettings().setConversationTimeout(factory.newDuration(100));
GetFileClient client = createGetFileClient();

addStep("Request the delivery of a file from a specific pillar. A callback listener should be supplied.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import javax.xml.datatype.DatatypeConfigurationException;
import javax.xml.datatype.DatatypeFactory;
import java.math.BigInteger;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
Expand Down Expand Up @@ -285,7 +284,9 @@ public void conversationTimeoutTest() throws Exception {
addDescription("Tests the the client handles lack of IdentifyPillarResponses gracefully ");

addStep("Set a 100 ms ConversationTimeout.", "");
settingsForCUT.getReferenceSettings().getClientSettings().setConversationTimeout(BigInteger.valueOf(100));
DatatypeFactory factory = DatatypeFactory.newInstance();
settingsForCUT.getReferenceSettings().getClientSettings()
.setConversationTimeout(factory.newDuration(100));
renewConversationMediator();

addStep("Start the operation",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void deleteClientIdentificationTimeout() throws Exception {
Assert.assertEquals(testEventHandler.waitForEvent().getEventType(), OperationEventType.IDENTIFY_REQUEST_SENT);

addStep("Do not respond. Just await the timeout.",
"Should make send a Failure event to the eventhandler.");
"Should make send a Failure event to the event handler.");
Assert.assertEquals(testEventHandler.waitForEvent().getEventType(), OperationEventType.IDENTIFY_TIMEOUT);
Assert.assertEquals(testEventHandler.waitForEvent().getEventType(), OperationEventType.COMPONENT_FAILED);
Assert.assertEquals(testEventHandler.waitForEvent().getEventType(), OperationEventType.FAILED);
Expand Down Expand Up @@ -274,7 +274,7 @@ public void deleteClientOperationTimeout() throws Exception {
Assert.assertEquals(testEventHandler.waitForEvent().getEventType(), OperationEventType.REQUEST_SENT);

addStep("Do not respond. Just await the timeout.",
"Should make send a Failure event to the eventhandler.");
"Should make send a Failure event to the event handler.");
Assert.assertEquals(testEventHandler.waitForEvent().getEventType(), OperationEventType.FAILED);
}

Expand Down
Loading