Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 104 additions & 65 deletions src/main/java/edu/harvard/iq/dataverse/util/bagit/BagGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.text.WordUtils;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
Expand All @@ -58,7 +59,7 @@
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.ssl.SSLContextBuilder;

import org.apache.http.util.EntityUtils;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
Expand Down Expand Up @@ -90,7 +91,8 @@ public class BagGenerator {

private int timeout = 60;
private RequestConfig config = RequestConfig.custom().setConnectTimeout(timeout * 1000)
.setConnectionRequestTimeout(timeout * 1000).setSocketTimeout(timeout * 1000).build();
.setConnectionRequestTimeout(timeout * 1000).setSocketTimeout(timeout * 1000)
.setCookieSpec(CookieSpecs.STANDARD).build();
protected CloseableHttpClient client;
private PoolingHttpClientConnectionManager cm = null;

Expand Down Expand Up @@ -281,7 +283,8 @@ public boolean generateBag(OutputStream outputStream) throws Exception {
}
createFileFromString(manifestName, sha1StringBuffer.toString());
} else {
logger.warning("No Hash values sent - Bag File does not meet BagIT specification requirement");
logger.warning("No Hash values (no files?) sending empty manifest to nominally comply with BagIT specification requirement");
createFileFromString("manifest-md5.txt", "");
}
// bagit.txt - Required by spec
createFileFromString("bagit.txt", "BagIt-Version: 1.0\r\nTag-File-Character-Encoding: UTF-8");
Expand Down Expand Up @@ -363,6 +366,7 @@ public boolean generateBag(String bagName, boolean temp) {
// Create an output stream backed by the file
bagFileOS = new FileOutputStream(bagFile);
if (generateBag(bagFileOS)) {
//The generateBag call sets this.bagName to the correct value
validateBagFile(bagFile);
if (usetemp) {
logger.fine("Moving tmp zip");
Expand All @@ -388,7 +392,8 @@ public void validateBag(String bagId) {
ZipFile zf = null;
InputStream is = null;
try {
zf = new ZipFile(getBagFile(bagId));
File bagFile = getBagFile(bagId);
zf = new ZipFile(bagFile);
ZipArchiveEntry entry = zf.getEntry(getValidName(bagId) + "/manifest-sha1.txt");
if (entry != null) {
logger.info("SHA1 hashes used");
Expand Down Expand Up @@ -428,7 +433,7 @@ public void validateBag(String bagId) {
}
IOUtils.closeQuietly(is);
logger.info("HashMap Map contains: " + checksumMap.size() + " entries");
checkFiles(checksumMap, zf);
checkFiles(checksumMap, bagFile);
} catch (IOException io) {
logger.log(Level.SEVERE,"Could not validate Hashes", io);
} catch (Exception e) {
Expand Down Expand Up @@ -457,14 +462,13 @@ public File getBagFile(String bagID) throws Exception {

private void validateBagFile(File bagFile) throws IOException {
// Run a confirmation test - should verify all files and hashes
ZipFile zf = new ZipFile(bagFile);

// Check files calculates the hashes and file sizes and reports on
// whether hashes are correct
checkFiles(checksumMap, zf);
checkFiles(checksumMap, bagFile);

logger.info("Data Count: " + dataCount);
logger.info("Data Size: " + totalDataSize);
zf.close();
}

public static String getValidName(String bagName) {
Expand All @@ -481,7 +485,7 @@ private void processContainer(JsonObject item, String currentPath) throws IOExce
} else if (item.has(JsonLDTerm.schemaOrg("name").getLabel())) {
title = item.get(JsonLDTerm.schemaOrg("name").getLabel()).getAsString();
}

logger.fine("Adding " + title + "/ to path " + currentPath);
currentPath = currentPath + title + "/";
int containerIndex = -1;
try {
Expand Down Expand Up @@ -557,6 +561,7 @@ private void processContainer(JsonObject item, String currentPath) throws IOExce
logger.warning("Duplicate/Collision: " + child.get("@id").getAsString() + " has SHA1 Hash: "
+ childHash + " in: " + bagID);
}
logger.fine("Adding " + childPath + " with hash " + childHash + " to checksumMap");
checksumMap.put(childPath, childHash);
}
}
Expand Down Expand Up @@ -700,29 +705,39 @@ private void createFileFromURL(final String relPath, final String uri)
addEntry(archiveEntry, supp);
}

private void checkFiles(HashMap<String, String> shaMap, ZipFile zf) {
private void checkFiles(HashMap<String, String> shaMap, File bagFile) {
ExecutorService executor = Executors.newFixedThreadPool(numConnections);
BagValidationJob.setZipFile(zf);
BagValidationJob.setBagGenerator(this);
logger.fine("Validating hashes for zipped data files");
int i = 0;
for (Entry<String, String> entry : shaMap.entrySet()) {
BagValidationJob vj = new BagValidationJob(entry.getValue(), entry.getKey());
executor.execute(vj);
i++;
if (i % 1000 == 0) {
logger.info("Queuing Hash Validations: " + i);
}
}
logger.fine("All Hash Validations Queued: " + i);

executor.shutdown();
ZipFile zf = null;
try {
while (!executor.awaitTermination(10, TimeUnit.MINUTES)) {
logger.fine("Awaiting completion of hash calculations.");
zf = new ZipFile(bagFile);

BagValidationJob.setZipFile(zf);
BagValidationJob.setBagGenerator(this);
logger.fine("Validating hashes for zipped data files");
int i = 0;
for (Entry<String, String> entry : shaMap.entrySet()) {
BagValidationJob vj = new BagValidationJob(bagName, entry.getValue(), entry.getKey());
executor.execute(vj);
i++;
if (i % 1000 == 0) {
logger.info("Queuing Hash Validations: " + i);
}
}
} catch (InterruptedException e) {
logger.log(Level.SEVERE,"Hash Calculations interrupted", e);
logger.fine("All Hash Validations Queued: " + i);

executor.shutdown();
try {
while (!executor.awaitTermination(10, TimeUnit.MINUTES)) {
logger.fine("Awaiting completion of hash calculations.");
}
} catch (InterruptedException e) {
logger.log(Level.SEVERE, "Hash Calculations interrupted", e);
}
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} finally {
IOUtils.closeQuietly(zf);
}
logger.fine("Hash Validations Completed");

Expand Down Expand Up @@ -907,8 +922,8 @@ public void incrementTotalDataSize(long inc) {
totalDataSize += inc;
}

public String getHashtype() {
return hashtype.toString();
public ChecksumType getHashtype() {
return hashtype;
}

// Get's all "Has Part" children, standardized to send an array with 0,1, or
Expand Down Expand Up @@ -993,46 +1008,70 @@ private HttpGet createNewGetRequest(URI url, String returnType) {
return request;
}

InputStreamSupplier getInputStreamSupplier(final String uri) {
InputStreamSupplier getInputStreamSupplier(final String uriString) {

return new InputStreamSupplier() {
public InputStream get() {
int tries = 0;
while (tries < 5) {
try {
logger.fine("Get # " + tries + " for " + uri);
HttpGet getMap = createNewGetRequest(new URI(uri), null);
logger.finest("Retrieving " + tries + ": " + uri);
CloseableHttpResponse response;
//Note - if we ever need to pass an HttpClientContext, we need a new one per thread.
response = client.execute(getMap);
if (response.getStatusLine().getStatusCode() == 200) {
logger.finest("Retrieved: " + uri);
return response.getEntity().getContent();
}
logger.fine("Status: " + response.getStatusLine().getStatusCode());
tries++;

} catch (ClientProtocolException e) {
tries += 5;
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// Retry if this is a potentially temporary error such
// as a timeout
tries++;
logger.log(Level.WARNING,"Attempt# " + tries + " : Unable to retrieve file: " + uri, e);
if (tries == 5) {
logger.severe("Final attempt failed for " + uri);
try {
URI uri = new URI(uriString);

int tries = 0;
while (tries < 5) {

logger.fine("Get # " + tries + " for " + uriString);
HttpGet getFile = createNewGetRequest(uri, null);
logger.finest("Retrieving " + tries + ": " + uriString);
CloseableHttpResponse response = null;
try {
response = client.execute(getFile);
// Note - if we ever need to pass an HttpClientContext, we need a new one per
// thread.
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200) {
logger.finest("Retrieved: " + uri);
return response.getEntity().getContent();
}
logger.warning("Attempt: " + tries + " - Unexpected Status when retrieving " + uriString
+ " : " + statusCode);
if (statusCode < 500) {
logger.fine("Will not retry for 40x errors");
tries += 5;
} else {
tries++;
}
// Error handling
if (response != null) {
try {
EntityUtils.consumeQuietly(response.getEntity());
response.close();
} catch (IOException io) {
logger.warning(
"Exception closing response after status: " + statusCode + " on " + uri);
}
}
} catch (ClientProtocolException e) {
tries += 5;
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// Retry if this is a potentially temporary error such
// as a timeout
tries++;
logger.log(Level.WARNING, "Attempt# " + tries + " : Unable to retrieve file: " + uriString,
e);
if (tries == 5) {
logger.severe("Final attempt failed for " + uriString);
}
e.printStackTrace();
}
e.printStackTrace();
} catch (URISyntaxException e) {
tries += 5;
// TODO Auto-generated catch block
e.printStackTrace();

}

} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
logger.severe("Could not read: " + uri);
logger.severe("Could not read: " + uriString);
return null;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.commons.compress.archivers.zip.ZipFile;

import edu.harvard.iq.dataverse.DataFile;
import edu.harvard.iq.dataverse.DataFile.ChecksumType;

import org.apache.commons.compress.utils.IOUtils;

Expand All @@ -41,13 +42,15 @@ public class BagValidationJob implements Runnable {

private String hash;
private String name;
private static String hashtype;
private String basePath;
private static ChecksumType hashtype;

public BagValidationJob(String value, String key) throws IllegalStateException {
public BagValidationJob(String bagName, String value, String key) throws IllegalStateException {
if (zf == null || bagGenerator == null) {
throw new IllegalStateException(
"Static Zipfile and BagGenerator must be set before creating ValidationJobs");
}
basePath=bagName;
hash = value;
name = key;

Expand All @@ -60,24 +63,28 @@ public BagValidationJob(String value, String key) throws IllegalStateException {
*/
public void run() {

String realHash = generateFileHash(name, zf);
String realHash = generateFileHash(basePath + "/" + name, zf);
if (hash.equals(realHash)) {
log.fine("Valid hash for " + name);
} else {
log.severe("Invalid " + bagGenerator.getHashtype() + " for " + name);
log.severe("Invalid " + bagGenerator.getHashtype().name() + " for " + name);
log.fine("As sent: " + hash);
log.fine("As calculated: " + realHash);
}
}

private String generateFileHash(String name, ZipFile zf) {

String realHash = null;

ZipArchiveEntry archiveEntry1 = zf.getEntry(name);

if(archiveEntry1 != null) {
// Error check - add file sizes to compare against supplied stats

log.fine("Getting stream for " + name);
long start = System.currentTimeMillis();
InputStream inputStream = null;
String realHash = null;

try {
inputStream = zf.getInputStream(archiveEntry1);
if (hashtype.equals(DataFile.ChecksumType.SHA1)) {
Expand All @@ -89,7 +96,7 @@ private String generateFileHash(String name, ZipFile zf) {
} else if (hashtype.equals(DataFile.ChecksumType.MD5)) {
realHash = DigestUtils.md5Hex(inputStream);
} else {
log.warning("Unknown hash type: " + hashtype);
log.warning("Unknown hash type: " + hashtype.name());
}

} catch (ZipException e) {
Expand All @@ -104,6 +111,9 @@ private String generateFileHash(String name, ZipFile zf) {
log.fine("Retrieve/compute time = " + (System.currentTimeMillis() - start) + " ms");
// Error check - add file sizes to compare against supplied stats
bagGenerator.incrementTotalDataSize(archiveEntry1.getSize());
} else {
log.warning("Entry " + name + " not found in zipped bag: not validated");
}
return realHash;
}

Expand Down