From 2d981e3d2326c29316b445c5503657346259c03c Mon Sep 17 00:00:00 2001 From: qqmyers Date: Wed, 13 Apr 2022 18:35:30 -0400 Subject: [PATCH 1/5] duracloud and google thread mgmt fixes --- .../impl/DuraCloudSubmitToArchiveCommand.java | 141 ++++++++++++------ .../GoogleCloudSubmitToArchiveCommand.java | 71 +++++---- 2 files changed, 138 insertions(+), 74 deletions(-) diff --git a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java index 468e99f24c1..b3b303d7407 100644 --- a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java +++ b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java @@ -41,19 +41,38 @@ public class DuraCloudSubmitToArchiveCommand extends AbstractSubmitToArchiveComm private static final String DURACLOUD_PORT = ":DuraCloudPort"; private static final String DURACLOUD_HOST = ":DuraCloudHost"; private static final String DURACLOUD_CONTEXT = ":DuraCloudContext"; - + private static final int DEFAULT_THREADS = 2; + + boolean success = false; + int bagThreads = DEFAULT_THREADS; public DuraCloudSubmitToArchiveCommand(DataverseRequest aRequest, DatasetVersion version) { super(aRequest, version); } @Override - public WorkflowStepResult performArchiveSubmission(DatasetVersion dv, ApiToken token, Map requestedSettings) { + public WorkflowStepResult performArchiveSubmission(DatasetVersion dv, ApiToken token, + Map requestedSettings) { - String port = requestedSettings.get(DURACLOUD_PORT) != null ? requestedSettings.get(DURACLOUD_PORT) : DEFAULT_PORT; - String dpnContext = requestedSettings.get(DURACLOUD_CONTEXT) != null ? requestedSettings.get(DURACLOUD_CONTEXT) : DEFAULT_CONTEXT; + String port = requestedSettings.get(DURACLOUD_PORT) != null ? requestedSettings.get(DURACLOUD_PORT) + : DEFAULT_PORT; + String dpnContext = requestedSettings.get(DURACLOUD_CONTEXT) != null ? requestedSettings.get(DURACLOUD_CONTEXT) + : DEFAULT_CONTEXT; String host = requestedSettings.get(DURACLOUD_HOST); + + if (requestedSettings.get(BagGenerator.BAG_GENERATOR_THREADS) != null) { + try { + bagThreads=Integer.valueOf(requestedSettings.get(BagGenerator.BAG_GENERATOR_THREADS)); + } catch (NumberFormatException nfe) { + logger.warning("Can't parse the value of setting " + BagGenerator.BAG_GENERATOR_THREADS + " as an integer - using default:" + DEFAULT_THREADS); + } + } + if (host != null) { Dataset dataset = dv.getDataset(); + // ToDo - change after HDC 3A changes to status reporting + // This will make the archivalCopyLocation non-null after a failure which should + // stop retries + dv.setArchivalCopyLocation("Attempted"); if (dataset.getLockFor(Reason.finalizePublication) == null && dataset.getLockFor(Reason.FileValidationFailed) == null) { // Use Duracloud client classes to login @@ -61,9 +80,24 @@ public WorkflowStepResult performArchiveSubmission(DatasetVersion dv, ApiToken t Credential credential = new Credential(System.getProperty("duracloud.username"), System.getProperty("duracloud.password")); storeManager.login(credential); - - String spaceName = dataset.getGlobalId().asString().replace(':', '-').replace('/', '-') - .replace('.', '-').toLowerCase(); + /* + * Aliases can contain upper case characters which are not allowed in space + * names. Similarly, aliases can contain '_' which isn't allowed in a space + * name. The line below replaces any upper case chars with lowercase and + * replaces any '_' with '.-' . The '-' after the dot assures we don't break the + * rule that + * "The last period in a aspace may not immediately be followed by a number". + * (Although we could check, it seems better to just add '.-' all the time.As + * written the replaceAll will also change any chars not valid in a spaceName to + * '.' which would avoid code breaking if the alias constraints change. That + * said, this line may map more than one alias to the same spaceName, e.g. + * "test" and "Test" aliases both map to the "test" space name. This does not + * break anything but does potentially put bags from more than one collection in + * the same space. + */ + String spaceName = dataset.getOwner().getAlias().toLowerCase().replaceAll("[^a-z0-9-]", ".dcsafe"); + String baseFileName = dataset.getGlobalId().asString().replace(':', '-').replace('/', '-') + .replace('.', '-').toLowerCase() + "_v" + dv.getFriendlyVersionNumber(); ContentStore store; try { @@ -75,87 +109,109 @@ public WorkflowStepResult performArchiveSubmission(DatasetVersion dv, ApiToken t */ store = storeManager.getPrimaryContentStore(); // Create space to copy archival files to - store.createSpace(spaceName); + if (!store.spaceExists(spaceName)) { + store.createSpace(spaceName); + } DataCitation dc = new DataCitation(dv); Map metadata = dc.getDataCiteMetadata(); String dataciteXml = DOIDataCiteRegisterService.getMetadataFromDvObject( dv.getDataset().getGlobalId().asString(), metadata, dv.getDataset()); MessageDigest messageDigest = MessageDigest.getInstance("MD5"); - try (PipedInputStream dataciteIn = new PipedInputStream(); DigestInputStream digestInputStream = new DigestInputStream(dataciteIn, messageDigest)) { + try (PipedInputStream dataciteIn = new PipedInputStream(); + DigestInputStream digestInputStream = new DigestInputStream(dataciteIn, messageDigest)) { // Add datacite.xml file - new Thread(new Runnable() { + Thread dcThread = new Thread(new Runnable() { public void run() { try (PipedOutputStream dataciteOut = new PipedOutputStream(dataciteIn)) { dataciteOut.write(dataciteXml.getBytes(Charset.forName("utf-8"))); dataciteOut.close(); + success=true; } catch (Exception e) { logger.severe("Error creating datacite.xml: " + e.getMessage()); // TODO Auto-generated catch block e.printStackTrace(); - throw new RuntimeException("Error creating datacite.xml: " + e.getMessage()); } } - }).start(); - //Have seen Pipe Closed errors for other archivers when used as a workflow without this delay loop - int i=0; - while(digestInputStream.available()<=0 && i<100) { + }); + dcThread.start(); + // Have seen Pipe Closed errors for other archivers when used as a workflow + // without this delay loop + int i = 0; + while (digestInputStream.available() <= 0 && i < 100) { Thread.sleep(10); i++; } - String checksum = store.addContent(spaceName, "datacite.xml", digestInputStream, -1l, null, null, - null); + String checksum = store.addContent(spaceName, baseFileName + "_datacite.xml", digestInputStream, + -1l, null, null, null); logger.fine("Content: datacite.xml added with checksum: " + checksum); + dcThread.join(); String localchecksum = Hex.encodeHexString(digestInputStream.getMessageDigest().digest()); - if (!checksum.equals(localchecksum)) { - logger.severe(checksum + " not equal to " + localchecksum); + if (!success || !checksum.equals(localchecksum)) { + logger.severe("Failure on " + baseFileName); + logger.severe(success ? checksum + " not equal to " + localchecksum : "failed to transfer to DuraCloud"); + try { + store.deleteContent(spaceName, baseFileName + "_datacite.xml"); + } catch (ContentStoreException cse) { + logger.warning(cse.getMessage()); + } return new Failure("Error in transferring DataCite.xml file to DuraCloud", "DuraCloud Submission Failure: incomplete metadata transfer"); } // Store BagIt file - String fileName = spaceName + "v" + dv.getFriendlyVersionNumber() + ".zip"; + success = false; + String fileName = baseFileName + ".zip"; // Add BagIt ZIP file // Although DuraCloud uses SHA-256 internally, it's API uses MD5 to verify the // transfer + messageDigest = MessageDigest.getInstance("MD5"); - try (PipedInputStream in = new PipedInputStream(); DigestInputStream digestInputStream2 = new DigestInputStream(in, messageDigest)) { - new Thread(new Runnable() { + try (PipedInputStream in = new PipedInputStream(); + DigestInputStream digestInputStream2 = new DigestInputStream(in, messageDigest)) { + Thread bagThread = new Thread(new Runnable() { public void run() { - try (PipedOutputStream out = new PipedOutputStream(in)){ + try (PipedOutputStream out = new PipedOutputStream(in)) { // Generate bag BagGenerator bagger = new BagGenerator(new OREMap(dv, false), dataciteXml); + bagger.setNumConnections(bagThreads); bagger.setAuthenticationKey(token.getTokenString()); bagger.generateBag(out); + success = true; } catch (Exception e) { logger.severe("Error creating bag: " + e.getMessage()); // TODO Auto-generated catch block e.printStackTrace(); - throw new RuntimeException("Error creating bag: " + e.getMessage()); } } - }).start(); - i=0; - while(digestInputStream.available()<=0 && i<100) { + }); + bagThread.start(); + i = 0; + while (digestInputStream.available() <= 0 && i < 100) { Thread.sleep(10); i++; } - checksum = store.addContent(spaceName, fileName, digestInputStream2, -1l, null, null, - null); - logger.fine("Content: " + fileName + " added with checksum: " + checksum); - localchecksum = Hex.encodeHexString(digestInputStream2.getMessageDigest().digest()); - if (!checksum.equals(localchecksum)) { - logger.severe(checksum + " not equal to " + localchecksum); + checksum = store.addContent(spaceName, fileName, digestInputStream2, -1l, null, null, null); + bagThread.join(); + if (success) { + logger.fine("Content: " + fileName + " added with checksum: " + checksum); + localchecksum = Hex.encodeHexString(digestInputStream2.getMessageDigest().digest()); + } + if (!success || !checksum.equals(localchecksum)) { + logger.severe("Failure on " + fileName); + logger.severe(success ? checksum + " not equal to " + localchecksum : "failed to transfer to DuraCloud"); + try { + store.deleteContent(spaceName, fileName); + store.deleteContent(spaceName, baseFileName + "_datacite.xml"); + } catch (ContentStoreException cse) { + logger.warning(cse.getMessage()); + } return new Failure("Error in transferring Zip file to DuraCloud", "DuraCloud Submission Failure: incomplete archive transfer"); } - } catch (RuntimeException rte) { - logger.severe(rte.getMessage()); - return new Failure("Error in generating Bag", - "DuraCloud Submission Failure: archive file not created"); } logger.fine("DuraCloud Submission step: Content Transferred"); @@ -179,10 +235,6 @@ public void run() { e.printStackTrace(); return new Failure("Error in transferring file to DuraCloud", "DuraCloud Submission Failure: archive file not transferred"); - } catch (RuntimeException rte) { - logger.severe(rte.getMessage()); - return new Failure("Error in generating datacite.xml file", - "DuraCloud Submission Failure: metadata file not created"); } catch (InterruptedException e) { logger.warning(e.getLocalizedMessage()); e.printStackTrace(); @@ -194,12 +246,13 @@ public void run() { if (!(1 == dv.getVersion()) || !(0 == dv.getMinorVersionNumber())) { mesg = mesg + ": Prior Version archiving not yet complete?"; } - return new Failure("Unable to create DuraCloud space with name: " + spaceName, mesg); + return new Failure("Unable to create DuraCloud space with name: " + baseFileName, mesg); } catch (NoSuchAlgorithmException e) { logger.severe("MD5 MessageDigest not available!"); } } else { - logger.warning("DuraCloud Submision Workflow aborted: Dataset locked for finalizePublication, or because file validation failed"); + logger.warning( + "DuraCloud Submision Workflow aborted: Dataset locked for finalizePublication, or because file validation failed"); return new Failure("Dataset locked"); } return WorkflowStepResult.OK; @@ -207,5 +260,5 @@ public void run() { return new Failure("DuraCloud Submission not configured - no \":DuraCloudHost\"."); } } - + } diff --git a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java index cb729a9807a..6ea7afcc734 100644 --- a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java +++ b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java @@ -33,6 +33,7 @@ import com.google.cloud.storage.Blob; import com.google.cloud.storage.Bucket; import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; @RequiredPermissions(Permission.PublishDataset) @@ -42,6 +43,8 @@ public class GoogleCloudSubmitToArchiveCommand extends AbstractSubmitToArchiveCo private static final String GOOGLECLOUD_BUCKET = ":GoogleCloudBucket"; private static final String GOOGLECLOUD_PROJECT = ":GoogleCloudProject"; + boolean success = false; + public GoogleCloudSubmitToArchiveCommand(DataverseRequest aRequest, DatasetVersion version) { super(aRequest, version); } @@ -55,7 +58,7 @@ public WorkflowStepResult performArchiveSubmission(DatasetVersion dv, ApiToken t if (bucketName != null && projectName != null) { Storage storage; try { - FileInputStream fis = new FileInputStream(System.getProperty("dataverse.files.directory") + System.getProperty("file.separator")+ "googlecloudkey.json"); + FileInputStream fis = new FileInputStream(System.getProperty("dataverse.files.directory") + System.getProperty("file.separator") + "googlecloudkey.json"); storage = StorageOptions.newBuilder() .setCredentials(ServiceAccountCredentials.fromStream(fis)) .setProjectId(projectName) @@ -73,42 +76,51 @@ public WorkflowStepResult performArchiveSubmission(DatasetVersion dv, ApiToken t Map metadata = dc.getDataCiteMetadata(); String dataciteXml = DOIDataCiteRegisterService.getMetadataFromDvObject( dv.getDataset().getGlobalId().asString(), metadata, dv.getDataset()); - String blobIdString = null; MessageDigest messageDigest = MessageDigest.getInstance("MD5"); try (PipedInputStream dataciteIn = new PipedInputStream(); DigestInputStream digestInputStream = new DigestInputStream(dataciteIn, messageDigest)) { // Add datacite.xml file - new Thread(new Runnable() { + Thread dcThread = new Thread(new Runnable() { public void run() { try (PipedOutputStream dataciteOut = new PipedOutputStream(dataciteIn)) { dataciteOut.write(dataciteXml.getBytes(Charset.forName("utf-8"))); dataciteOut.close(); + success = true; } catch (Exception e) { logger.severe("Error creating datacite.xml: " + e.getMessage()); // TODO Auto-generated catch block e.printStackTrace(); - throw new RuntimeException("Error creating datacite.xml: " + e.getMessage()); + // throw new RuntimeException("Error creating datacite.xml: " + e.getMessage()); } } - }).start(); - //Have seen broken pipe in PostPublishDataset workflow without this delay - int i=0; - while(digestInputStream.available()<=0 && i<100) { + }); + dcThread.start(); + // Have seen broken pipe in PostPublishDataset workflow without this delay + int i = 0; + while (digestInputStream.available() <= 0 && i < 100) { Thread.sleep(10); i++; } - Blob dcXml = bucket.create(spaceName + "/datacite.v" + dv.getFriendlyVersionNumber()+".xml", digestInputStream, "text/xml", Bucket.BlobWriteOption.doesNotExist()); + Blob dcXml = bucket.create(spaceName + "/datacite.v" + dv.getFriendlyVersionNumber() + ".xml", digestInputStream, "text/xml", Bucket.BlobWriteOption.doesNotExist()); + + dcThread.join(); String checksum = dcXml.getMd5ToHexString(); logger.fine("Content: datacite.xml added with checksum: " + checksum); String localchecksum = Hex.encodeHexString(digestInputStream.getMessageDigest().digest()); - if (!checksum.equals(localchecksum)) { - logger.severe(checksum + " not equal to " + localchecksum); + if (!success || !checksum.equals(localchecksum)) { + logger.severe(success ? checksum + " not equal to " + localchecksum : "datacite.xml transfer did not succeed"); + try { + dcXml.delete(Blob.BlobSourceOption.generationMatch()); + } catch (StorageException se) { + logger.warning(se.getMessage()); + } return new Failure("Error in transferring DataCite.xml file to GoogleCloud", "GoogleCloud Submission Failure: incomplete metadata transfer"); } // Store BagIt file + success = false; String fileName = spaceName + ".v" + dv.getFriendlyVersionNumber() + ".zip"; // Add BagIt ZIP file @@ -123,13 +135,14 @@ public void run() { BagGenerator bagger = new BagGenerator(new OREMap(dv, false), dataciteXml); bagger.setAuthenticationKey(token.getTokenString()); bagger.generateBag(out); + success=true; } catch (Exception e) { logger.severe("Error creating bag: " + e.getMessage()); // TODO Auto-generated catch block e.printStackTrace(); try { digestInputStream2.close(); - } catch(Exception ex) { + } catch (Exception ex) { logger.warning(ex.getLocalizedMessage()); } throw new RuntimeException("Error creating bag: " + e.getMessage()); @@ -165,48 +178,46 @@ public void run() { * increased, and/or a change in how archives are sent to google (e.g. as * multiple blobs that get aggregated) would be required. */ - i=0; - while(digestInputStream2.available()<=90000 && i<2000 && writeThread.isAlive()) { + i = 0; + while (digestInputStream2.available() <= 90000 && i < 2000 && writeThread.isAlive()) { Thread.sleep(1000); logger.fine("avail: " + digestInputStream2.available() + " : " + writeThread.getState().toString()); i++; } logger.fine("Bag: transfer started, i=" + i + ", avail = " + digestInputStream2.available()); - if(i==2000) { + if (i == 2000) { throw new IOException("Stream not available"); } Blob bag = bucket.create(spaceName + "/" + fileName, digestInputStream2, "application/zip", Bucket.BlobWriteOption.doesNotExist()); - if(bag.getSize()==0) { + if (bag.getSize() == 0) { throw new IOException("Empty Bag"); } - blobIdString = bag.getBlobId().getBucket() + "/" + bag.getBlobId().getName(); + writeThread.join(); + checksum = bag.getMd5ToHexString(); logger.fine("Bag: " + fileName + " added with checksum: " + checksum); localchecksum = Hex.encodeHexString(digestInputStream2.getMessageDigest().digest()); - if (!checksum.equals(localchecksum)) { - logger.severe(checksum + " not equal to " + localchecksum); + if (!success || !checksum.equals(localchecksum)) { + logger.severe(success ? checksum + " not equal to " + localchecksum : "bag transfer did not succeed"); + try { + bag.delete(Blob.BlobSourceOption.generationMatch()); + } catch (StorageException se) { + logger.warning(se.getMessage()); + } return new Failure("Error in transferring Zip file to GoogleCloud", "GoogleCloud Submission Failure: incomplete archive transfer"); } - } catch (RuntimeException rte) { - logger.severe("Error creating Bag during GoogleCloud archiving: " + rte.getMessage()); - return new Failure("Error in generating Bag", - "GoogleCloud Submission Failure: archive file not created"); } logger.fine("GoogleCloud Submission step: Content Transferred"); // Document the location of dataset archival copy location (actually the URL - // where you can - // view it as an admin) + // where you can view it as an admin) + // Changed to point at bucket where the zip and datacite.xml are visible StringBuffer sb = new StringBuffer("https://console.cloud.google.com/storage/browser/"); - sb.append(blobIdString); + sb.append(bucketName + "/" + spaceName); dv.setArchivalCopyLocation(sb.toString()); - } catch (RuntimeException rte) { - logger.severe("Error creating datacite xml file during GoogleCloud Archiving: " + rte.getMessage()); - return new Failure("Error in generating datacite.xml file", - "GoogleCloud Submission Failure: metadata file not created"); } } else { logger.warning("GoogleCloud Submision Workflow aborted: Dataset locked for pidRegister"); From cf673d1c12d75b6fdd9ac06396cac808ae0460ee Mon Sep 17 00:00:00 2001 From: qqmyers Date: Wed, 13 Apr 2022 18:52:00 -0400 Subject: [PATCH 2/5] remove bag thread support for now --- .../command/impl/DuraCloudSubmitToArchiveCommand.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java index b3b303d7407..de63eeca754 100644 --- a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java +++ b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java @@ -41,10 +41,8 @@ public class DuraCloudSubmitToArchiveCommand extends AbstractSubmitToArchiveComm private static final String DURACLOUD_PORT = ":DuraCloudPort"; private static final String DURACLOUD_HOST = ":DuraCloudHost"; private static final String DURACLOUD_CONTEXT = ":DuraCloudContext"; - private static final int DEFAULT_THREADS = 2; boolean success = false; - int bagThreads = DEFAULT_THREADS; public DuraCloudSubmitToArchiveCommand(DataverseRequest aRequest, DatasetVersion version) { super(aRequest, version); } @@ -59,14 +57,6 @@ public WorkflowStepResult performArchiveSubmission(DatasetVersion dv, ApiToken t : DEFAULT_CONTEXT; String host = requestedSettings.get(DURACLOUD_HOST); - if (requestedSettings.get(BagGenerator.BAG_GENERATOR_THREADS) != null) { - try { - bagThreads=Integer.valueOf(requestedSettings.get(BagGenerator.BAG_GENERATOR_THREADS)); - } catch (NumberFormatException nfe) { - logger.warning("Can't parse the value of setting " + BagGenerator.BAG_GENERATOR_THREADS + " as an integer - using default:" + DEFAULT_THREADS); - } - } - if (host != null) { Dataset dataset = dv.getDataset(); // ToDo - change after HDC 3A changes to status reporting @@ -177,7 +167,6 @@ public void run() { try (PipedOutputStream out = new PipedOutputStream(in)) { // Generate bag BagGenerator bagger = new BagGenerator(new OREMap(dv, false), dataciteXml); - bagger.setNumConnections(bagThreads); bagger.setAuthenticationKey(token.getTokenString()); bagger.generateBag(out); success = true; From 630a5e929923a1a1ece24bc3ea7b4f326d92d854 Mon Sep 17 00:00:00 2001 From: qqmyers Date: Fri, 13 May 2022 12:50:23 -0400 Subject: [PATCH 3/5] add/update wait for first zipped files to stream to avoid timeout also increased wait to 20K seconds (and use new constant) to help handle larger data --- .../impl/DuraCloudSubmitToArchiveCommand.java | 38 +++++++++++++++++-- .../GoogleCloudSubmitToArchiveCommand.java | 12 +++--- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java index de63eeca754..eabe9f326b2 100644 --- a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java +++ b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java @@ -41,6 +41,7 @@ public class DuraCloudSubmitToArchiveCommand extends AbstractSubmitToArchiveComm private static final String DURACLOUD_PORT = ":DuraCloudPort"; private static final String DURACLOUD_HOST = ":DuraCloudHost"; private static final String DURACLOUD_CONTEXT = ":DuraCloudContext"; + private static final int MAX_ZIP_WAIT = 20000; boolean success = false; public DuraCloudSubmitToArchiveCommand(DataverseRequest aRequest, DatasetVersion version) { @@ -160,7 +161,7 @@ public void run() { // transfer messageDigest = MessageDigest.getInstance("MD5"); - try (PipedInputStream in = new PipedInputStream(); + try (PipedInputStream in = new PipedInputStream(100000); DigestInputStream digestInputStream2 = new DigestInputStream(in, messageDigest)) { Thread bagThread = new Thread(new Runnable() { public void run() { @@ -178,11 +179,42 @@ public void run() { } }); bagThread.start(); + /* + * The following loop handles two issues. First, with no delay, the + * bucket.create() call below can get started before the piped streams are set + * up, causing a failure (seen when triggered in a PostPublishDataset workflow). + * A minimal initial wait, e.g. until some bytes are available, would address + * this. Second, the BagGenerator class, due to it's use of parallel streaming + * creation of the zip file, has the characteristic that it makes a few bytes + * available - from setting up the directory structure for the zip file - + * significantly earlier than it is ready to stream file content (e.g. for + * thousands of files and GB of content). If, for these large datasets, + * store.addContent() is called as soon as bytes are available, the call can + * timeout before the bytes for all the zipped files are available. To manage + * this, the loop waits until 90K bytes are available, larger than any expected + * dir structure for the zip and implying that the main zipped content is + * available, or until the thread terminates, with all of its content written to + * the pipe. (Note the PipedInputStream buffer is set at 100K above - I didn't + * want to test whether that means that exactly 100K bytes will be available() + * for large datasets or not, so the test below is at 90K.) + * + * An additional sanity check limits the wait to 20K (MAX_ZIP_WAIT) seconds. The BagGenerator + * has been used to archive >120K files, 2K directories, and ~600GB files on the + * SEAD project (streaming content to disk rather than over an internet + * connection) which would take longer than 20K seconds (even 10+ hours) and might + * produce an initial set of bytes for directories > 90K. If Dataverse ever + * needs to support datasets of this size, the numbers here would need to be + * increased, and/or a change in how archives are sent to google (e.g. as + * multiple blobs that get aggregated) would be required. + */ i = 0; - while (digestInputStream.available() <= 0 && i < 100) { - Thread.sleep(10); + while (digestInputStream2.available() <= 90000 && i < MAX_ZIP_WAIT && bagThread.isAlive()) { + Thread.sleep(1000); i++; } + if(i==MAX_ZIP_WAIT) { + throw new IOException("Stream not available"); + } checksum = store.addContent(spaceName, fileName, digestInputStream2, -1l, null, null, null); bagThread.join(); if (success) { diff --git a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java index 6ea7afcc734..add3659fc8b 100644 --- a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java +++ b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java @@ -15,16 +15,13 @@ import edu.harvard.iq.dataverse.workflow.step.Failure; import edu.harvard.iq.dataverse.workflow.step.WorkflowStepResult; -import java.io.BufferedInputStream; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.nio.charset.Charset; import java.security.DigestInputStream; import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.Map; import java.util.logging.Logger; @@ -42,6 +39,7 @@ public class GoogleCloudSubmitToArchiveCommand extends AbstractSubmitToArchiveCo private static final Logger logger = Logger.getLogger(GoogleCloudSubmitToArchiveCommand.class.getName()); private static final String GOOGLECLOUD_BUCKET = ":GoogleCloudBucket"; private static final String GOOGLECLOUD_PROJECT = ":GoogleCloudProject"; + private static final int MAX_ZIP_WAIT = 20000; boolean success = false; @@ -169,23 +167,23 @@ public void run() { * want to test whether that means that exactly 100K bytes will be available() * for large datasets or not, so the test below is at 90K.) * - * An additional sanity check limits the wait to 2K seconds. The BagGenerator + * An additional sanity check limits the wait to 20K (MAX_ZIP_WAIT) seconds. The BagGenerator * has been used to archive >120K files, 2K directories, and ~600GB files on the * SEAD project (streaming content to disk rather than over an internet - * connection) which would take longer than 2K seconds (10+ hours) and might + * connection) which would take longer than 20K seconds (10+ hours) and might * produce an initial set of bytes for directories > 90K. If Dataverse ever * needs to support datasets of this size, the numbers here would need to be * increased, and/or a change in how archives are sent to google (e.g. as * multiple blobs that get aggregated) would be required. */ i = 0; - while (digestInputStream2.available() <= 90000 && i < 2000 && writeThread.isAlive()) { + while (digestInputStream2.available() <= 90000 && i < MAX_ZIP_WAIT && writeThread.isAlive()) { Thread.sleep(1000); logger.fine("avail: " + digestInputStream2.available() + " : " + writeThread.getState().toString()); i++; } logger.fine("Bag: transfer started, i=" + i + ", avail = " + digestInputStream2.available()); - if (i == 2000) { + if (i == MAX_ZIP_WAIT) { throw new IOException("Stream not available"); } Blob bag = bucket.create(spaceName + "/" + fileName, digestInputStream2, "application/zip", Bucket.BlobWriteOption.doesNotExist()); From f8ec7ad55b964693c44b1402b8f4a0b1ab269b7d Mon Sep 17 00:00:00 2001 From: qqmyers Date: Fri, 13 May 2022 13:25:37 -0400 Subject: [PATCH 4/5] refactor bag thread mgmt to base class. align code --- .../impl/AbstractSubmitToArchiveCommand.java | 75 ++++++++++++++++- .../impl/DuraCloudSubmitToArchiveCommand.java | 60 +------------ .../GoogleCloudSubmitToArchiveCommand.java | 84 +++---------------- 3 files changed, 88 insertions(+), 131 deletions(-) diff --git a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/AbstractSubmitToArchiveCommand.java b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/AbstractSubmitToArchiveCommand.java index 77ea680598f..8b035a563cb 100644 --- a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/AbstractSubmitToArchiveCommand.java +++ b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/AbstractSubmitToArchiveCommand.java @@ -1,7 +1,6 @@ package edu.harvard.iq.dataverse.engine.command.impl; import edu.harvard.iq.dataverse.DatasetVersion; -import edu.harvard.iq.dataverse.DvObject; import edu.harvard.iq.dataverse.authorization.Permission; import edu.harvard.iq.dataverse.authorization.users.ApiToken; import edu.harvard.iq.dataverse.authorization.users.AuthenticatedUser; @@ -11,9 +10,14 @@ import edu.harvard.iq.dataverse.engine.command.RequiredPermissions; import edu.harvard.iq.dataverse.engine.command.exception.CommandException; import edu.harvard.iq.dataverse.settings.SettingsServiceBean; +import edu.harvard.iq.dataverse.util.bagit.BagGenerator; +import edu.harvard.iq.dataverse.util.bagit.OREMap; import edu.harvard.iq.dataverse.workflow.step.WorkflowStepResult; -import java.util.Date; +import java.io.IOException; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.security.DigestInputStream; import java.util.HashMap; import java.util.Map; import java.util.logging.Logger; @@ -23,7 +27,9 @@ public abstract class AbstractSubmitToArchiveCommand extends AbstractCommand requestedSettings = new HashMap(); + protected boolean success=false; private static final Logger logger = Logger.getLogger(AbstractSubmitToArchiveCommand.class.getName()); + private static final int MAX_ZIP_WAIT = 20000; public AbstractSubmitToArchiveCommand(DataverseRequest aRequest, DatasetVersion version) { super(aRequest, version.getDataset()); @@ -73,4 +79,69 @@ public String describe() { + version.getFriendlyVersionNumber()+")]"; } + public Thread startBagThread(DatasetVersion dv, PipedInputStream in, DigestInputStream digestInputStream2, + String dataciteXml, ApiToken token) throws IOException, InterruptedException { + Thread bagThread = new Thread(new Runnable() { + public void run() { + try (PipedOutputStream out = new PipedOutputStream(in)) { + // Generate bag + BagGenerator bagger = new BagGenerator(new OREMap(dv, false), dataciteXml); + bagger.setAuthenticationKey(token.getTokenString()); + bagger.generateBag(out); + success = true; + } catch (Exception e) { + logger.severe("Error creating bag: " + e.getMessage()); + // TODO Auto-generated catch block + e.printStackTrace(); + try { + digestInputStream2.close(); + } catch (Exception ex) { + logger.warning(ex.getLocalizedMessage()); + } + throw new RuntimeException("Error creating bag: " + e.getMessage()); + } + } + }); + bagThread.start(); + /* + * The following loop handles two issues. First, with no delay, the + * bucket.create() call below can get started before the piped streams are set + * up, causing a failure (seen when triggered in a PostPublishDataset workflow). + * A minimal initial wait, e.g. until some bytes are available, would address + * this. Second, the BagGenerator class, due to it's use of parallel streaming + * creation of the zip file, has the characteristic that it makes a few bytes + * available - from setting up the directory structure for the zip file - + * significantly earlier than it is ready to stream file content (e.g. for + * thousands of files and GB of content). If, for these large datasets, + * the transfer is started as soon as bytes are available, the call can + * timeout before the bytes for all the zipped files are available. To manage + * this, the loop waits until 90K bytes are available, larger than any expected + * dir structure for the zip and implying that the main zipped content is + * available, or until the thread terminates, with all of its content written to + * the pipe. (Note the PipedInputStream buffer is set at 100K above - I didn't + * want to test whether that means that exactly 100K bytes will be available() + * for large datasets or not, so the test below is at 90K.) + * + * An additional sanity check limits the wait to 20K (MAX_ZIP_WAIT) seconds. The BagGenerator + * has been used to archive >120K files, 2K directories, and ~600GB files on the + * SEAD project (streaming content to disk rather than over an internet + * connection) which would take longer than 20K seconds (even 10+ hours) and might + * produce an initial set of bytes for directories > 90K. If Dataverse ever + * needs to support datasets of this size, the numbers here would need to be + * increased, and/or a change in how archives are sent to google (e.g. as + * multiple blobs that get aggregated) would be required. + */ + int i = 0; + while (digestInputStream2.available() <= 90000 && i < MAX_ZIP_WAIT && bagThread.isAlive()) { + Thread.sleep(1000); + logger.fine("avail: " + digestInputStream2.available() + " : " + bagThread.getState().toString()); + i++; + } + logger.fine("Bag: transfer started, i=" + i + ", avail = " + digestInputStream2.available()); + if(i==MAX_ZIP_WAIT) { + throw new IOException("Stream not available"); + } + return bagThread; + } + } diff --git a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java index eabe9f326b2..4b780527325 100644 --- a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java +++ b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java @@ -10,8 +10,6 @@ import edu.harvard.iq.dataverse.engine.command.Command; import edu.harvard.iq.dataverse.engine.command.DataverseRequest; import edu.harvard.iq.dataverse.engine.command.RequiredPermissions; -import edu.harvard.iq.dataverse.util.bagit.BagGenerator; -import edu.harvard.iq.dataverse.util.bagit.OREMap; import edu.harvard.iq.dataverse.workflow.step.Failure; import edu.harvard.iq.dataverse.workflow.step.WorkflowStepResult; @@ -41,9 +39,7 @@ public class DuraCloudSubmitToArchiveCommand extends AbstractSubmitToArchiveComm private static final String DURACLOUD_PORT = ":DuraCloudPort"; private static final String DURACLOUD_HOST = ":DuraCloudHost"; private static final String DURACLOUD_CONTEXT = ":DuraCloudContext"; - private static final int MAX_ZIP_WAIT = 20000; - - boolean success = false; + public DuraCloudSubmitToArchiveCommand(DataverseRequest aRequest, DatasetVersion version) { super(aRequest, version); } @@ -163,58 +159,7 @@ public void run() { messageDigest = MessageDigest.getInstance("MD5"); try (PipedInputStream in = new PipedInputStream(100000); DigestInputStream digestInputStream2 = new DigestInputStream(in, messageDigest)) { - Thread bagThread = new Thread(new Runnable() { - public void run() { - try (PipedOutputStream out = new PipedOutputStream(in)) { - // Generate bag - BagGenerator bagger = new BagGenerator(new OREMap(dv, false), dataciteXml); - bagger.setAuthenticationKey(token.getTokenString()); - bagger.generateBag(out); - success = true; - } catch (Exception e) { - logger.severe("Error creating bag: " + e.getMessage()); - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - }); - bagThread.start(); - /* - * The following loop handles two issues. First, with no delay, the - * bucket.create() call below can get started before the piped streams are set - * up, causing a failure (seen when triggered in a PostPublishDataset workflow). - * A minimal initial wait, e.g. until some bytes are available, would address - * this. Second, the BagGenerator class, due to it's use of parallel streaming - * creation of the zip file, has the characteristic that it makes a few bytes - * available - from setting up the directory structure for the zip file - - * significantly earlier than it is ready to stream file content (e.g. for - * thousands of files and GB of content). If, for these large datasets, - * store.addContent() is called as soon as bytes are available, the call can - * timeout before the bytes for all the zipped files are available. To manage - * this, the loop waits until 90K bytes are available, larger than any expected - * dir structure for the zip and implying that the main zipped content is - * available, or until the thread terminates, with all of its content written to - * the pipe. (Note the PipedInputStream buffer is set at 100K above - I didn't - * want to test whether that means that exactly 100K bytes will be available() - * for large datasets or not, so the test below is at 90K.) - * - * An additional sanity check limits the wait to 20K (MAX_ZIP_WAIT) seconds. The BagGenerator - * has been used to archive >120K files, 2K directories, and ~600GB files on the - * SEAD project (streaming content to disk rather than over an internet - * connection) which would take longer than 20K seconds (even 10+ hours) and might - * produce an initial set of bytes for directories > 90K. If Dataverse ever - * needs to support datasets of this size, the numbers here would need to be - * increased, and/or a change in how archives are sent to google (e.g. as - * multiple blobs that get aggregated) would be required. - */ - i = 0; - while (digestInputStream2.available() <= 90000 && i < MAX_ZIP_WAIT && bagThread.isAlive()) { - Thread.sleep(1000); - i++; - } - if(i==MAX_ZIP_WAIT) { - throw new IOException("Stream not available"); - } + Thread bagThread = startBagThread(dv, in, digestInputStream2, dataciteXml, token); checksum = store.addContent(spaceName, fileName, digestInputStream2, -1l, null, null, null); bagThread.join(); if (success) { @@ -281,5 +226,4 @@ public void run() { return new Failure("DuraCloud Submission not configured - no \":DuraCloudHost\"."); } } - } diff --git a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java index add3659fc8b..7eb09452abb 100644 --- a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java +++ b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java @@ -10,8 +10,6 @@ import edu.harvard.iq.dataverse.engine.command.Command; import edu.harvard.iq.dataverse.engine.command.DataverseRequest; import edu.harvard.iq.dataverse.engine.command.RequiredPermissions; -import edu.harvard.iq.dataverse.util.bagit.BagGenerator; -import edu.harvard.iq.dataverse.util.bagit.OREMap; import edu.harvard.iq.dataverse.workflow.step.Failure; import edu.harvard.iq.dataverse.workflow.step.WorkflowStepResult; @@ -39,9 +37,6 @@ public class GoogleCloudSubmitToArchiveCommand extends AbstractSubmitToArchiveCo private static final Logger logger = Logger.getLogger(GoogleCloudSubmitToArchiveCommand.class.getName()); private static final String GOOGLECLOUD_BUCKET = ":GoogleCloudBucket"; private static final String GOOGLECLOUD_PROJECT = ":GoogleCloudProject"; - private static final int MAX_ZIP_WAIT = 20000; - - boolean success = false; public GoogleCloudSubmitToArchiveCommand(DataverseRequest aRequest, DatasetVersion version) { super(aRequest, version); @@ -75,7 +70,8 @@ public WorkflowStepResult performArchiveSubmission(DatasetVersion dv, ApiToken t String dataciteXml = DOIDataCiteRegisterService.getMetadataFromDvObject( dv.getDataset().getGlobalId().asString(), metadata, dv.getDataset()); MessageDigest messageDigest = MessageDigest.getInstance("MD5"); - try (PipedInputStream dataciteIn = new PipedInputStream(); DigestInputStream digestInputStream = new DigestInputStream(dataciteIn, messageDigest)) { + try (PipedInputStream dataciteIn = new PipedInputStream(); + DigestInputStream digestInputStream = new DigestInputStream(dataciteIn, messageDigest)) { // Add datacite.xml file Thread dcThread = new Thread(new Runnable() { @@ -94,7 +90,8 @@ public void run() { } }); dcThread.start(); - // Have seen broken pipe in PostPublishDataset workflow without this delay + // Have seen Pipe Closed errors for other archivers when used as a workflow + // without this delay loop int i = 0; while (digestInputStream.available() <= 0 && i < 100) { Thread.sleep(10); @@ -107,6 +104,7 @@ public void run() { logger.fine("Content: datacite.xml added with checksum: " + checksum); String localchecksum = Hex.encodeHexString(digestInputStream.getMessageDigest().digest()); if (!success || !checksum.equals(localchecksum)) { + logger.severe("Failure on " + spaceName); logger.severe(success ? checksum + " not equal to " + localchecksum : "datacite.xml transfer did not succeed"); try { dcXml.delete(Blob.BlobSourceOption.generationMatch()); @@ -125,78 +123,22 @@ public void run() { // Google uses MD5 as one way to verify the // transfer messageDigest = MessageDigest.getInstance("MD5"); - try (PipedInputStream in = new PipedInputStream(100000); DigestInputStream digestInputStream2 = new DigestInputStream(in, messageDigest);) { - Thread writeThread = new Thread(new Runnable() { - public void run() { - try (PipedOutputStream out = new PipedOutputStream(in)) { - // Generate bag - BagGenerator bagger = new BagGenerator(new OREMap(dv, false), dataciteXml); - bagger.setAuthenticationKey(token.getTokenString()); - bagger.generateBag(out); - success=true; - } catch (Exception e) { - logger.severe("Error creating bag: " + e.getMessage()); - // TODO Auto-generated catch block - e.printStackTrace(); - try { - digestInputStream2.close(); - } catch (Exception ex) { - logger.warning(ex.getLocalizedMessage()); - } - throw new RuntimeException("Error creating bag: " + e.getMessage()); - } - } - }); - writeThread.start(); - /* - * The following loop handles two issues. First, with no delay, the - * bucket.create() call below can get started before the piped streams are set - * up, causing a failure (seen when triggered in a PostPublishDataset workflow). - * A minimal initial wait, e.g. until some bytes are available, would address - * this. Second, the BagGenerator class, due to it's use of parallel streaming - * creation of the zip file, has the characteristic that it makes a few bytes - * available - from setting up the directory structure for the zip file - - * significantly earlier than it is ready to stream file content (e.g. for - * thousands of files and GB of content). If, for these large datasets, - * bucket.create() is called as soon as bytes are available, the call can - * timeout before the bytes for all the zipped files are available. To manage - * this, the loop waits until 90K bytes are available, larger than any expected - * dir structure for the zip and implying that the main zipped content is - * available, or until the thread terminates, with all of its content written to - * the pipe. (Note the PipedInputStream buffer is set at 100K above - I didn't - * want to test whether that means that exactly 100K bytes will be available() - * for large datasets or not, so the test below is at 90K.) - * - * An additional sanity check limits the wait to 20K (MAX_ZIP_WAIT) seconds. The BagGenerator - * has been used to archive >120K files, 2K directories, and ~600GB files on the - * SEAD project (streaming content to disk rather than over an internet - * connection) which would take longer than 20K seconds (10+ hours) and might - * produce an initial set of bytes for directories > 90K. If Dataverse ever - * needs to support datasets of this size, the numbers here would need to be - * increased, and/or a change in how archives are sent to google (e.g. as - * multiple blobs that get aggregated) would be required. - */ - i = 0; - while (digestInputStream2.available() <= 90000 && i < MAX_ZIP_WAIT && writeThread.isAlive()) { - Thread.sleep(1000); - logger.fine("avail: " + digestInputStream2.available() + " : " + writeThread.getState().toString()); - i++; - } - logger.fine("Bag: transfer started, i=" + i + ", avail = " + digestInputStream2.available()); - if (i == MAX_ZIP_WAIT) { - throw new IOException("Stream not available"); - } - Blob bag = bucket.create(spaceName + "/" + fileName, digestInputStream2, "application/zip", Bucket.BlobWriteOption.doesNotExist()); + try (PipedInputStream in = new PipedInputStream(100000); + DigestInputStream digestInputStream2 = new DigestInputStream(in, messageDigest)) { + Thread bagThread = startBagThread(dv, in, digestInputStream2, dataciteXml, token); + Blob bag = bucket.create(spaceName + "/" + fileName, digestInputStream2, "application/zip", + Bucket.BlobWriteOption.doesNotExist()); if (bag.getSize() == 0) { throw new IOException("Empty Bag"); } - writeThread.join(); + bagThread.join(); checksum = bag.getMd5ToHexString(); logger.fine("Bag: " + fileName + " added with checksum: " + checksum); localchecksum = Hex.encodeHexString(digestInputStream2.getMessageDigest().digest()); if (!success || !checksum.equals(localchecksum)) { - logger.severe(success ? checksum + " not equal to " + localchecksum : "bag transfer did not succeed"); + logger.severe(success ? checksum + " not equal to " + localchecksum + : "bag transfer did not succeed"); try { bag.delete(Blob.BlobSourceOption.generationMatch()); } catch (StorageException se) { From eab9b0de9ed5a92e3b119b5b89442545dcc6dc0d Mon Sep 17 00:00:00 2001 From: qqmyers Date: Fri, 13 May 2022 14:54:49 -0400 Subject: [PATCH 5/5] increment DuraCloud lib version --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index f89c30c2ae5..5b9b4e23310 100644 --- a/pom.xml +++ b/pom.xml @@ -463,7 +463,7 @@ org.duracloud common - 7.1.0 + 7.1.1 org.slf4j @@ -478,7 +478,7 @@ org.duracloud storeclient - 7.1.0 + 7.1.1 org.slf4j