Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
package org.bitrepository.audittrails;

import org.bitrepository.audittrails.collector.AuditTrailCollector;
import org.bitrepository.audittrails.preserver.AuditTrailPreserver;
import org.bitrepository.audittrails.store.AuditEventIterator;
import org.bitrepository.audittrails.store.AuditTrailStore;
import org.bitrepository.audittrails.webservice.CollectorInfo;
import org.bitrepository.audittrails.webservice.PreservationInfo;
import org.bitrepository.bitrepositoryelements.FileAction;
import org.bitrepository.common.ArgumentValidator;
import org.bitrepository.common.settings.Settings;
Expand All @@ -51,31 +53,46 @@ public class AuditTrailService implements LifeCycledService {
private final Logger log = LoggerFactory.getLogger(getClass());
private final AuditTrailStore store;
private final AuditTrailCollector collector;
private final AuditTrailPreserver preserver;
private final ContributorMediator mediator;
private final Settings settings;

/**
* Constructor for audit trail service.
*
* @param store The store for the audit trail data.
* @param collector The collector of new audit trail data.
* @param preserver The preserver for saving collected audit trail data to a collection.
* Provide non-null preserver for enabling preservation.
* @param mediator The mediator for the communication of this contributor.
* @param settings The AuditTrailService settings.
*/
public AuditTrailService(
AuditTrailStore store,
AuditTrailCollector collector,
ContributorMediator mediator,
Settings settings) {
public AuditTrailService(AuditTrailStore store, AuditTrailCollector collector, AuditTrailPreserver preserver,
ContributorMediator mediator, Settings settings) {
ArgumentValidator.checkNotNull(collector, "AuditTrailCollector collector");
ArgumentValidator.checkNotNull(store, "AuditTrailStore store");
ArgumentValidator.checkNotNull(mediator, "ContributorMediator mediator");
ArgumentValidator.checkNotNull(settings, "Settings settings");

this.store = store;
this.collector = collector;
this.preserver = preserver;
this.mediator = mediator;
this.settings = settings;
}

mediator.start();
/**
* Constructor for audit trail service with disabled preservation.
*
* See {@link #AuditTrailService(AuditTrailStore, AuditTrailCollector, AuditTrailPreserver, ContributorMediator,
* Settings)} for param descriptions.
*/
public AuditTrailService(
AuditTrailStore store,
AuditTrailCollector collector,
ContributorMediator mediator,
Settings settings) {
this(store, collector, null, mediator, settings);
}

/**
Expand All @@ -89,7 +106,7 @@ public AuditTrailService(
* @param reportingComponent Restrict the results to only be reported by this component
* @param actor Restrict the results to only be events caused by this actor
* @param action Restrict the results to only be about this type of action
* @param fingerprint the fingerprint
* @param fingerprint The fingerprint
* @param operationID Restrict the results to only this operationID
* @return an iterator to all AuditTrailEvents matching the criteria from the parameters
*/
Expand Down Expand Up @@ -128,6 +145,18 @@ public List<CollectorInfo> getCollectorInfos() {
return infos;
}

/**
* Get preservation info if preservation of audit trails is enabled.
*
* @return PreservationInfo or null if not enabled.
*/
public PreservationInfo getPreservationInfo() {
if (preserver == null ) {
return null;
}
return preserver.getPreservationInfo();
}
Comment thread
Bohlski marked this conversation as resolved.

/**
* Get the list of known contributors from the backend.
*
Expand All @@ -139,14 +168,21 @@ public List<String> getContributors() {

@Override
public void start() {
if (preserver != null) {
preserver.start();
}
mediator.start();
}

@Override
public void shutdown() {
collector.close();
if (preserver != null) {
preserver.close();
}
store.close();
mediator.close();

MessageBus messageBus = MessageBusManager.getMessageBus();
if (messageBus != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,17 @@ public static synchronized AuditTrailService getAuditTrailService() {
securityManager, serviceSettings.getID());

AuditTrailCollector collector = new AuditTrailCollector(settings, client, store, alarmDispatcher);
AuditTrailPreserver preserver;

if (serviceSettings.isSetAuditTrailPreservation()) {
preserver = new LocalAuditTrailPreserver(
log.info("Audit trail preservation enabled in configuration. Audit trail service will preserve trails.");
AuditTrailPreserver preserver = new LocalAuditTrailPreserver(
settings, store, putClient, ProtocolComponentFactory.getInstance().getFileExchange(settings));
preserver.start();
auditTrailService = new AuditTrailService(store, collector, preserver, mediator, settings);
} else {
log.info("Audit trail preservation disabled, no configuration defined.");
auditTrailService = new AuditTrailService(store, collector, mediator, settings);
}

auditTrailService = new AuditTrailService(store, collector, mediator, settings);
auditTrailService.start();
}

return auditTrailService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
package org.bitrepository.audittrails.collector;

import org.bitrepository.common.TimerTaskSchedule;
import org.bitrepository.common.utils.SettingsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,15 +33,15 @@ public class AuditTrailCollectionTimerTask extends TimerTask {
private final Logger log = LoggerFactory.getLogger(getClass());

private final IncrementalCollector collector;
private final CollectionSchedule schedule;
private final TimerTaskSchedule schedule;

/**
* @param collector The collector doing the actual work.
* @param interval The interval between running this timer task.
* @param gracePeriod The period that should pass before the first scheduled collection
*/
public AuditTrailCollectionTimerTask(IncrementalCollector collector, long interval, int gracePeriod) {
this.schedule = new CollectionSchedule(interval, gracePeriod);
this.schedule = new TimerTaskSchedule(interval, gracePeriod);
this.collector = collector;
log.info("Scheduled next collection of audit trails for {}", schedule.getNextRun());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public CollectorInfo getCollectorInfo(String collectionID) {
info.setLastDuration("Collection has not finished yet");
}
} else {
info.setLastStart("Audit trail collection have not started");
info.setLastStart("Audit trail collection has not started");
info.setLastDuration("Not available");
}
info.setNextStart(TimeUtils.shortDate(nextRun));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ public class IncrementalCollector {
private long collectedAudits = 0;

/**
* @param collectionID the collection ID
* @param collectionID The collection ID
* @param clientID The clientID to use for the requests.
* @param client The client to use for the operations.
* @param store Where to persist the received results.
* @param maxNumberOfResults A optional limit on the number of audit trail events to request. If not set, {}
* @param alarmDispatcher the alarm dispatcher
* @param maxNumberOfResults An optional limit on the number of audit trail events to request. If not set, {}
* @param alarmDispatcher The alarm dispatcher
*/
public IncrementalCollector(String collectionID, String clientID, AuditTrailClient client, AuditTrailStore store,
int maxNumberOfResults, AlarmDispatcher alarmDispatcher) {
Expand Down Expand Up @@ -103,7 +103,6 @@ public long getNumberOfCollectedAudits() {

/**
* Setup and initiates the collection of audit trails through the client.
* Adds one to the sequence number to request only newer audit trails.
*
* @param contributors the collection of IDs of contributor
*/
Expand All @@ -121,17 +120,18 @@ public void performCollection(Collection<String> contributors) {
}

/**
* Collect a page of audit trails from the active contributors
* Collect a page of audit trails from the active contributors.
* Adds 1 to the sequence number to only collect newer audit trails.
*
* @param contributors The contributors to collect from
* @return Collection<String> the contributors that have more audits to collect
*/
private Collection<String> collect(Collection<String> contributors) {
List<AuditTrailQuery> queries = new ArrayList<>();

for (String contributorId : contributors) {
long seq = store.largestSequenceNumber(contributorId, collectionID);
queries.add(new AuditTrailQuery(contributorId, seq + 1, null, maxNumberOfResults));
for (String contributorID : contributors) {
long seqNum = store.largestSequenceNumber(contributorID, collectionID);
queries.add(new AuditTrailQuery(contributorID, seqNum + 1, null, maxNumberOfResults));
}

log.debug("Collecting of AuditTrails for '{}' with ContributorQueries: {}", collectionID, queries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ public class AuditPacker {
/**
* Map between the contributor id and the reached preservation sequence number.
*/
private final Map<String, Long> seqReached = new HashMap<>();
private final Map<String, Long> seqNumsReached = new HashMap<>();
/**
* Whether the output stream should be appended to the file.
*/
private static final boolean APPEND = true;

private long packedAuditCount = 0;

/**
* Constructor.
*
Expand All @@ -77,7 +79,7 @@ public AuditPacker(AuditTrailStore store, AuditTrailPreservation settings, Strin
this.directory = FileUtils.retrieveDirectory(settings.getAuditTrailPreservationTemporaryDirectory());
this.contributors.addAll(SettingsUtils.getAuditContributorsForCollection(collectionID));

initialiseReachedSequenceNumbers();
initializeReachedSequenceNumbers();
}

/**
Expand All @@ -86,16 +88,16 @@ public AuditPacker(AuditTrailStore store, AuditTrailPreservation settings, Strin
* @return A mapping between the contributor ids and their preservation sequence numbers.
*/
public Map<String, Long> getSequenceNumbersReached() {
return new HashMap<>(seqReached);
return new HashMap<>(seqNumsReached);
}

/**
* Retrieves the preservation sequence number for each contributor and inserts it into the map.
*/
private void initialiseReachedSequenceNumbers() {
private void initializeReachedSequenceNumbers() {
for (String contributor : contributors) {
Long seq = store.getPreservationSequenceNumber(contributor, collectionID);
seqReached.put(contributor, seq);
Long seqNum = store.getPreservationSequenceNumber(contributor, collectionID);
seqNumsReached.put(contributor, seqNum);
}
}

Expand All @@ -107,6 +109,7 @@ private void initialiseReachedSequenceNumbers() {
* @return A compressed file with all the audit trails.
*/
public synchronized File createNewPackage() {
resetPackedAuditCount();
File container = new File(directory, collectionID + "-audit-trails-" + System.currentTimeMillis());
try {
if (container.createNewFile()) {
Expand All @@ -124,6 +127,14 @@ public synchronized File createNewPackage() {
return null;
}

/**
* Resets {@link #packedAuditCount}.
* Done before creating a new package to ensure only new packed audits are counted.
*/
private void resetPackedAuditCount() {
packedAuditCount = 0;
}

/**
* Packs all newest audit trails from every contributor into the given file.
*
Expand All @@ -148,15 +159,16 @@ private void packContributors(File container) throws IOException {
/**
* Writes all the newest audit trails for a single contributor to the PrintWriter.
*
* @param contributorId The id of the contributor to write the files for.
* @param contributorID The id of the contributor to write the files for.
* @param writer The PrinterWriter where the output will be written.
*/
private void packContributor(String contributorId, PrintWriter writer) {
long nextSeqNumber = store.getPreservationSequenceNumber(contributorId, collectionID);
private void packContributor(String contributorID, PrintWriter writer) {
long nextSeqNumber = store.getPreservationSequenceNumber(contributorID, collectionID) + 1;
long largestSeqNumber = -1;
long numPackedAudits = 0;
log.debug("Starting to pack AuditTrails for contributor: " + contributorId + " for collection: " + collectionID);
AuditEventIterator iterator = store.getAuditTrailsByIterator(null, collectionID, contributorId, nextSeqNumber, null,
log.debug("Starting to pack AuditTrails at seq-number {} for contributor: {} for collection: {}",
nextSeqNumber, contributorID, collectionID);
AuditEventIterator iterator = store.getAuditTrailsByIterator(null, collectionID, contributorID, nextSeqNumber, null,
null, null, null, null, null, null);
long timeStart = System.currentTimeMillis();
long logInterval = 1000;
Expand All @@ -171,12 +183,15 @@ private void packContributor(String contributorId, PrintWriter writer) {
writer.println(event);

if ((numPackedAudits % logInterval) == 0) {
log.debug("Packed " + numPackedAudits + " AuditTrails in: " + (System.currentTimeMillis() - timeStart) + " ms");
log.debug("Packed {} AuditTrails in: {} ms", numPackedAudits, System.currentTimeMillis() - timeStart);
}
}
log.debug("Packed a total of: " + numPackedAudits + " AuditTrails in: " + (System.currentTimeMillis() - timeStart) + " ms");
log.debug("Packed a total of: {} AuditTrails in: {} ms",
numPackedAudits, System.currentTimeMillis() - timeStart);

if (numPackedAudits > 0) {
seqReached.put(contributorId, largestSeqNumber);
packedAuditCount += numPackedAudits;
seqNumsReached.put(contributorID, largestSeqNumber);
}
}

Expand All @@ -194,4 +209,13 @@ private File createCompressedFile(File fileToCompress) throws IOException {
FileUtils.zipFile(fileToCompress, zippedFile);
return zippedFile;
}

/**
* Get the last count of packed audits i.e. count of new audits from all contributors for the collection.
*
* @return Count of packed audits.
*/
public long getPackedAuditCount() {
return packedAuditCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ public class AuditPreservationEventHandler implements EventHandler {
private final String collectionID;

/**
* @param preservationSequenceNumber The map between the contributor ids and their respective sequence number.
* @param contributorSequenceNumbers The map between the contributor ids and their respective sequence number.
* @param store The store which should be updated with these sequence numbers.
* @param collectionID The ID of the collection that needs to have its sequence number updated.
*/
public AuditPreservationEventHandler(Map<String, Long> preservationSequenceNumber, AuditTrailStore store, String collectionID) {
this.seqNumbers = preservationSequenceNumber;
public AuditPreservationEventHandler(Map<String, Long> contributorSequenceNumbers, AuditTrailStore store,
String collectionID) {
this.seqNumbers = contributorSequenceNumbers;
this.store = store;
this.collectionID = collectionID;
}
Expand All @@ -58,8 +59,9 @@ public AuditPreservationEventHandler(Map<String, Long> preservationSequenceNumbe
public void handleEvent(OperationEvent event) {
if (event.getEventType() == OperationEventType.COMPLETE) {
updateStoreWithResults();

} else {
log.debug("Event for preservation of audit trails: " + event.toString());
log.debug("Event for preservation of audit trails: {}", event);
}
}

Expand All @@ -68,11 +70,12 @@ public void handleEvent(OperationEvent event) {
*/
private void updateStoreWithResults() {
for (Map.Entry<String, Long> entry : seqNumbers.entrySet()) {
if (store.havePreservationKey(entry.getKey(), collectionID)) {
store.setPreservationSequenceNumber(entry.getKey(), collectionID, entry.getValue());
String contributorID = entry.getKey();
if (store.hasPreservationKey(contributorID, collectionID)) {
store.setPreservationSequenceNumber(contributorID, collectionID, entry.getValue());
} else {
log.debug("Preservation key for contributor: " + entry.getKey() + " in collection: " + collectionID +
" is not known by the database.");
log.debug("Preservation key for contributor: {} in collection: {} is not known by the database.",
contributorID, collectionID);
}
}
}
Expand Down
Loading