diff --git a/pom.xml b/pom.xml index ce9f1c4b63d..41e6155c15f 100644 --- a/pom.xml +++ b/pom.xml @@ -463,7 +463,7 @@ org.duracloud common - 7.1.0 + 7.1.1 org.slf4j @@ -478,7 +478,7 @@ org.duracloud storeclient - 7.1.0 + 7.1.1 org.slf4j diff --git a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/AbstractSubmitToArchiveCommand.java b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/AbstractSubmitToArchiveCommand.java index 4fa0961d134..3574526c910 100644 --- a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/AbstractSubmitToArchiveCommand.java +++ b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/AbstractSubmitToArchiveCommand.java @@ -1,7 +1,6 @@ package edu.harvard.iq.dataverse.engine.command.impl; import edu.harvard.iq.dataverse.DatasetVersion; -import edu.harvard.iq.dataverse.DvObject; import edu.harvard.iq.dataverse.authorization.Permission; import edu.harvard.iq.dataverse.authorization.users.ApiToken; import edu.harvard.iq.dataverse.authorization.users.AuthenticatedUser; @@ -12,9 +11,13 @@ import edu.harvard.iq.dataverse.engine.command.exception.CommandException; import edu.harvard.iq.dataverse.settings.SettingsServiceBean; import edu.harvard.iq.dataverse.util.bagit.BagGenerator; +import edu.harvard.iq.dataverse.util.bagit.OREMap; import edu.harvard.iq.dataverse.workflow.step.WorkflowStepResult; -import java.util.Date; +import java.io.IOException; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.security.DigestInputStream; import java.util.HashMap; import java.util.Map; import java.util.logging.Logger; @@ -24,7 +27,9 @@ public abstract class AbstractSubmitToArchiveCommand extends AbstractCommand requestedSettings = new HashMap(); + protected boolean success=false; private static final Logger logger = Logger.getLogger(AbstractSubmitToArchiveCommand.class.getName()); + private static final int MAX_ZIP_WAIT = 20000; private static final int DEFAULT_THREADS = 2; public AbstractSubmitToArchiveCommand(DataverseRequest aRequest, DatasetVersion version) { @@ -87,4 +92,70 @@ public String describe() { + version.getFriendlyVersionNumber()+")]"; } + public Thread startBagThread(DatasetVersion dv, PipedInputStream in, DigestInputStream digestInputStream2, + String dataciteXml, ApiToken token) throws IOException, InterruptedException { + Thread bagThread = new Thread(new Runnable() { + public void run() { + try (PipedOutputStream out = new PipedOutputStream(in)) { + // Generate bag + BagGenerator bagger = new BagGenerator(new OREMap(dv, false), dataciteXml); + bagger.setNumConnections(getNumberOfBagGeneratorThreads()); + bagger.setAuthenticationKey(token.getTokenString()); + bagger.generateBag(out); + success = true; + } catch (Exception e) { + logger.severe("Error creating bag: " + e.getMessage()); + // TODO Auto-generated catch block + e.printStackTrace(); + try { + digestInputStream2.close(); + } catch (Exception ex) { + logger.warning(ex.getLocalizedMessage()); + } + throw new RuntimeException("Error creating bag: " + e.getMessage()); + } + } + }); + bagThread.start(); + /* + * The following loop handles two issues. First, with no delay, the + * bucket.create() call below can get started before the piped streams are set + * up, causing a failure (seen when triggered in a PostPublishDataset workflow). + * A minimal initial wait, e.g. until some bytes are available, would address + * this. Second, the BagGenerator class, due to it's use of parallel streaming + * creation of the zip file, has the characteristic that it makes a few bytes + * available - from setting up the directory structure for the zip file - + * significantly earlier than it is ready to stream file content (e.g. for + * thousands of files and GB of content). If, for these large datasets, + * the transfer is started as soon as bytes are available, the call can + * timeout before the bytes for all the zipped files are available. To manage + * this, the loop waits until 90K bytes are available, larger than any expected + * dir structure for the zip and implying that the main zipped content is + * available, or until the thread terminates, with all of its content written to + * the pipe. (Note the PipedInputStream buffer is set at 100K above - I didn't + * want to test whether that means that exactly 100K bytes will be available() + * for large datasets or not, so the test below is at 90K.) + * + * An additional sanity check limits the wait to 20K (MAX_ZIP_WAIT) seconds. The BagGenerator + * has been used to archive >120K files, 2K directories, and ~600GB files on the + * SEAD project (streaming content to disk rather than over an internet + * connection) which would take longer than 20K seconds (even 10+ hours) and might + * produce an initial set of bytes for directories > 90K. If Dataverse ever + * needs to support datasets of this size, the numbers here would need to be + * increased, and/or a change in how archives are sent to google (e.g. as + * multiple blobs that get aggregated) would be required. + */ + int i = 0; + while (digestInputStream2.available() <= 90000 && i < MAX_ZIP_WAIT && bagThread.isAlive()) { + Thread.sleep(1000); + logger.fine("avail: " + digestInputStream2.available() + " : " + bagThread.getState().toString()); + i++; + } + logger.fine("Bag: transfer started, i=" + i + ", avail = " + digestInputStream2.available()); + if(i==MAX_ZIP_WAIT) { + throw new IOException("Stream not available"); + } + return bagThread; + } + } diff --git a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java index f30183663e6..50667993bdc 100644 --- a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java +++ b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/DuraCloudSubmitToArchiveCommand.java @@ -10,8 +10,6 @@ import edu.harvard.iq.dataverse.engine.command.Command; import edu.harvard.iq.dataverse.engine.command.DataverseRequest; import edu.harvard.iq.dataverse.engine.command.RequiredPermissions; -import edu.harvard.iq.dataverse.util.bagit.BagGenerator; -import edu.harvard.iq.dataverse.util.bagit.OREMap; import edu.harvard.iq.dataverse.workflow.step.Failure; import edu.harvard.iq.dataverse.workflow.step.WorkflowStepResult; @@ -48,13 +46,21 @@ public DuraCloudSubmitToArchiveCommand(DataverseRequest aRequest, DatasetVersion } @Override - public WorkflowStepResult performArchiveSubmission(DatasetVersion dv, ApiToken token, Map requestedSettings) { + public WorkflowStepResult performArchiveSubmission(DatasetVersion dv, ApiToken token, + Map requestedSettings) { - String port = requestedSettings.get(DURACLOUD_PORT) != null ? requestedSettings.get(DURACLOUD_PORT) : DEFAULT_PORT; - String dpnContext = requestedSettings.get(DURACLOUD_CONTEXT) != null ? requestedSettings.get(DURACLOUD_CONTEXT) : DEFAULT_CONTEXT; + String port = requestedSettings.get(DURACLOUD_PORT) != null ? requestedSettings.get(DURACLOUD_PORT) + : DEFAULT_PORT; + String dpnContext = requestedSettings.get(DURACLOUD_CONTEXT) != null ? requestedSettings.get(DURACLOUD_CONTEXT) + : DEFAULT_CONTEXT; String host = requestedSettings.get(DURACLOUD_HOST); + if (host != null) { Dataset dataset = dv.getDataset(); + // ToDo - change after HDC 3A changes to status reporting + // This will make the archivalCopyLocation non-null after a failure which should + // stop retries + dv.setArchivalCopyLocation("Attempted"); if (dataset.getLockFor(Reason.finalizePublication) == null && dataset.getLockFor(Reason.FileValidationFailed) == null) { // Use Duracloud client classes to login @@ -62,9 +68,24 @@ public WorkflowStepResult performArchiveSubmission(DatasetVersion dv, ApiToken t Credential credential = new Credential(System.getProperty("duracloud.username"), System.getProperty("duracloud.password")); storeManager.login(credential); - - String spaceName = dataset.getGlobalId().asString().replace(':', '-').replace('/', '-') - .replace('.', '-').toLowerCase(); + /* + * Aliases can contain upper case characters which are not allowed in space + * names. Similarly, aliases can contain '_' which isn't allowed in a space + * name. The line below replaces any upper case chars with lowercase and + * replaces any '_' with '.-' . The '-' after the dot assures we don't break the + * rule that + * "The last period in a aspace may not immediately be followed by a number". + * (Although we could check, it seems better to just add '.-' all the time.As + * written the replaceAll will also change any chars not valid in a spaceName to + * '.' which would avoid code breaking if the alias constraints change. That + * said, this line may map more than one alias to the same spaceName, e.g. + * "test" and "Test" aliases both map to the "test" space name. This does not + * break anything but does potentially put bags from more than one collection in + * the same space. + */ + String spaceName = dataset.getOwner().getAlias().toLowerCase().replaceAll("[^a-z0-9-]", ".dcsafe"); + String baseFileName = dataset.getGlobalId().asString().replace(':', '-').replace('/', '-') + .replace('.', '-').toLowerCase() + "_v" + dv.getFriendlyVersionNumber(); ContentStore store; try { @@ -76,88 +97,88 @@ public WorkflowStepResult performArchiveSubmission(DatasetVersion dv, ApiToken t */ store = storeManager.getPrimaryContentStore(); // Create space to copy archival files to - store.createSpace(spaceName); + if (!store.spaceExists(spaceName)) { + store.createSpace(spaceName); + } DataCitation dc = new DataCitation(dv); Map metadata = dc.getDataCiteMetadata(); String dataciteXml = DOIDataCiteRegisterService.getMetadataFromDvObject( dv.getDataset().getGlobalId().asString(), metadata, dv.getDataset()); MessageDigest messageDigest = MessageDigest.getInstance("MD5"); - try (PipedInputStream dataciteIn = new PipedInputStream(); DigestInputStream digestInputStream = new DigestInputStream(dataciteIn, messageDigest)) { + try (PipedInputStream dataciteIn = new PipedInputStream(); + DigestInputStream digestInputStream = new DigestInputStream(dataciteIn, messageDigest)) { // Add datacite.xml file - new Thread(new Runnable() { + Thread dcThread = new Thread(new Runnable() { public void run() { try (PipedOutputStream dataciteOut = new PipedOutputStream(dataciteIn)) { dataciteOut.write(dataciteXml.getBytes(Charset.forName("utf-8"))); dataciteOut.close(); + success=true; } catch (Exception e) { logger.severe("Error creating datacite.xml: " + e.getMessage()); // TODO Auto-generated catch block e.printStackTrace(); - throw new RuntimeException("Error creating datacite.xml: " + e.getMessage()); } } - }).start(); - //Have seen Pipe Closed errors for other archivers when used as a workflow without this delay loop - int i=0; - while(digestInputStream.available()<=0 && i<100) { + }); + dcThread.start(); + // Have seen Pipe Closed errors for other archivers when used as a workflow + // without this delay loop + int i = 0; + while (digestInputStream.available() <= 0 && i < 100) { Thread.sleep(10); i++; } - String checksum = store.addContent(spaceName, "datacite.xml", digestInputStream, -1l, null, null, - null); + String checksum = store.addContent(spaceName, baseFileName + "_datacite.xml", digestInputStream, + -1l, null, null, null); logger.fine("Content: datacite.xml added with checksum: " + checksum); + dcThread.join(); String localchecksum = Hex.encodeHexString(digestInputStream.getMessageDigest().digest()); - if (!checksum.equals(localchecksum)) { - logger.severe(checksum + " not equal to " + localchecksum); + if (!success || !checksum.equals(localchecksum)) { + logger.severe("Failure on " + baseFileName); + logger.severe(success ? checksum + " not equal to " + localchecksum : "failed to transfer to DuraCloud"); + try { + store.deleteContent(spaceName, baseFileName + "_datacite.xml"); + } catch (ContentStoreException cse) { + logger.warning(cse.getMessage()); + } return new Failure("Error in transferring DataCite.xml file to DuraCloud", "DuraCloud Submission Failure: incomplete metadata transfer"); } // Store BagIt file - String fileName = spaceName + "v" + dv.getFriendlyVersionNumber() + ".zip"; + success = false; + String fileName = baseFileName + ".zip"; // Add BagIt ZIP file // Although DuraCloud uses SHA-256 internally, it's API uses MD5 to verify the // transfer + messageDigest = MessageDigest.getInstance("MD5"); - try (PipedInputStream in = new PipedInputStream(); DigestInputStream digestInputStream2 = new DigestInputStream(in, messageDigest)) { - new Thread(new Runnable() { - public void run() { - try (PipedOutputStream out = new PipedOutputStream(in)){ - // Generate bag - BagGenerator bagger = new BagGenerator(new OREMap(dv, false), dataciteXml); - bagger.setNumConnections(getNumberOfBagGeneratorThreads()); - bagger.setAuthenticationKey(token.getTokenString()); - bagger.generateBag(out); - } catch (Exception e) { - logger.severe("Error creating bag: " + e.getMessage()); - // TODO Auto-generated catch block - e.printStackTrace(); - throw new RuntimeException("Error creating bag: " + e.getMessage()); - } - } - }).start(); - i=0; - while(digestInputStream.available()<=0 && i<100) { - Thread.sleep(10); - i++; + try (PipedInputStream in = new PipedInputStream(100000); + DigestInputStream digestInputStream2 = new DigestInputStream(in, messageDigest)) { + Thread bagThread = startBagThread(dv, in, digestInputStream2, dataciteXml, token); + checksum = store.addContent(spaceName, fileName, digestInputStream2, -1l, null, null, null); + bagThread.join(); + if (success) { + logger.fine("Content: " + fileName + " added with checksum: " + checksum); + localchecksum = Hex.encodeHexString(digestInputStream2.getMessageDigest().digest()); } - checksum = store.addContent(spaceName, fileName, digestInputStream2, -1l, null, null, - null); - logger.fine("Content: " + fileName + " added with checksum: " + checksum); - localchecksum = Hex.encodeHexString(digestInputStream2.getMessageDigest().digest()); - if (!checksum.equals(localchecksum)) { - logger.severe(checksum + " not equal to " + localchecksum); + if (!success || !checksum.equals(localchecksum)) { + logger.severe("Failure on " + fileName); + logger.severe(success ? checksum + " not equal to " + localchecksum : "failed to transfer to DuraCloud"); + try { + store.deleteContent(spaceName, fileName); + store.deleteContent(spaceName, baseFileName + "_datacite.xml"); + } catch (ContentStoreException cse) { + logger.warning(cse.getMessage()); + } return new Failure("Error in transferring Zip file to DuraCloud", "DuraCloud Submission Failure: incomplete archive transfer"); } - } catch (RuntimeException rte) { - logger.severe(rte.getMessage()); - return new Failure("Error in generating Bag", - "DuraCloud Submission Failure: archive file not created"); } logger.fine("DuraCloud Submission step: Content Transferred"); @@ -181,10 +202,6 @@ public void run() { e.printStackTrace(); return new Failure("Error in transferring file to DuraCloud", "DuraCloud Submission Failure: archive file not transferred"); - } catch (RuntimeException rte) { - logger.severe(rte.getMessage()); - return new Failure("Error in generating datacite.xml file", - "DuraCloud Submission Failure: metadata file not created"); } catch (InterruptedException e) { logger.warning(e.getLocalizedMessage()); e.printStackTrace(); @@ -196,12 +213,13 @@ public void run() { if (!(1 == dv.getVersion()) || !(0 == dv.getMinorVersionNumber())) { mesg = mesg + ": Prior Version archiving not yet complete?"; } - return new Failure("Unable to create DuraCloud space with name: " + spaceName, mesg); + return new Failure("Unable to create DuraCloud space with name: " + baseFileName, mesg); } catch (NoSuchAlgorithmException e) { logger.severe("MD5 MessageDigest not available!"); } } else { - logger.warning("DuraCloud Submision Workflow aborted: Dataset locked for finalizePublication, or because file validation failed"); + logger.warning( + "DuraCloud Submision Workflow aborted: Dataset locked for finalizePublication, or because file validation failed"); return new Failure("Dataset locked"); } return WorkflowStepResult.OK; @@ -209,5 +227,4 @@ public void run() { return new Failure("DuraCloud Submission not configured - no \":DuraCloudHost\"."); } } - } diff --git a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java index af4c960c2d6..7eb09452abb 100644 --- a/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java +++ b/src/main/java/edu/harvard/iq/dataverse/engine/command/impl/GoogleCloudSubmitToArchiveCommand.java @@ -10,21 +10,16 @@ import edu.harvard.iq.dataverse.engine.command.Command; import edu.harvard.iq.dataverse.engine.command.DataverseRequest; import edu.harvard.iq.dataverse.engine.command.RequiredPermissions; -import edu.harvard.iq.dataverse.util.bagit.BagGenerator; -import edu.harvard.iq.dataverse.util.bagit.OREMap; import edu.harvard.iq.dataverse.workflow.step.Failure; import edu.harvard.iq.dataverse.workflow.step.WorkflowStepResult; -import java.io.BufferedInputStream; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.nio.charset.Charset; import java.security.DigestInputStream; import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.Map; import java.util.logging.Logger; @@ -33,6 +28,7 @@ import com.google.cloud.storage.Blob; import com.google.cloud.storage.Bucket; import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; @RequiredPermissions(Permission.PublishDataset) @@ -55,7 +51,7 @@ public WorkflowStepResult performArchiveSubmission(DatasetVersion dv, ApiToken t if (bucketName != null && projectName != null) { Storage storage; try { - FileInputStream fis = new FileInputStream(System.getProperty("dataverse.files.directory") + System.getProperty("file.separator")+ "googlecloudkey.json"); + FileInputStream fis = new FileInputStream(System.getProperty("dataverse.files.directory") + System.getProperty("file.separator") + "googlecloudkey.json"); storage = StorageOptions.newBuilder() .setCredentials(ServiceAccountCredentials.fromStream(fis)) .setProjectId(projectName) @@ -73,141 +69,95 @@ public WorkflowStepResult performArchiveSubmission(DatasetVersion dv, ApiToken t Map metadata = dc.getDataCiteMetadata(); String dataciteXml = DOIDataCiteRegisterService.getMetadataFromDvObject( dv.getDataset().getGlobalId().asString(), metadata, dv.getDataset()); - String blobIdString = null; MessageDigest messageDigest = MessageDigest.getInstance("MD5"); - try (PipedInputStream dataciteIn = new PipedInputStream(); DigestInputStream digestInputStream = new DigestInputStream(dataciteIn, messageDigest)) { + try (PipedInputStream dataciteIn = new PipedInputStream(); + DigestInputStream digestInputStream = new DigestInputStream(dataciteIn, messageDigest)) { // Add datacite.xml file - new Thread(new Runnable() { + Thread dcThread = new Thread(new Runnable() { public void run() { try (PipedOutputStream dataciteOut = new PipedOutputStream(dataciteIn)) { dataciteOut.write(dataciteXml.getBytes(Charset.forName("utf-8"))); dataciteOut.close(); + success = true; } catch (Exception e) { logger.severe("Error creating datacite.xml: " + e.getMessage()); // TODO Auto-generated catch block e.printStackTrace(); - throw new RuntimeException("Error creating datacite.xml: " + e.getMessage()); + // throw new RuntimeException("Error creating datacite.xml: " + e.getMessage()); } } - }).start(); - //Have seen broken pipe in PostPublishDataset workflow without this delay - int i=0; - while(digestInputStream.available()<=0 && i<100) { + }); + dcThread.start(); + // Have seen Pipe Closed errors for other archivers when used as a workflow + // without this delay loop + int i = 0; + while (digestInputStream.available() <= 0 && i < 100) { Thread.sleep(10); i++; } - Blob dcXml = bucket.create(spaceName + "/datacite.v" + dv.getFriendlyVersionNumber()+".xml", digestInputStream, "text/xml", Bucket.BlobWriteOption.doesNotExist()); + Blob dcXml = bucket.create(spaceName + "/datacite.v" + dv.getFriendlyVersionNumber() + ".xml", digestInputStream, "text/xml", Bucket.BlobWriteOption.doesNotExist()); + + dcThread.join(); String checksum = dcXml.getMd5ToHexString(); logger.fine("Content: datacite.xml added with checksum: " + checksum); String localchecksum = Hex.encodeHexString(digestInputStream.getMessageDigest().digest()); - if (!checksum.equals(localchecksum)) { - logger.severe(checksum + " not equal to " + localchecksum); + if (!success || !checksum.equals(localchecksum)) { + logger.severe("Failure on " + spaceName); + logger.severe(success ? checksum + " not equal to " + localchecksum : "datacite.xml transfer did not succeed"); + try { + dcXml.delete(Blob.BlobSourceOption.generationMatch()); + } catch (StorageException se) { + logger.warning(se.getMessage()); + } return new Failure("Error in transferring DataCite.xml file to GoogleCloud", "GoogleCloud Submission Failure: incomplete metadata transfer"); } // Store BagIt file + success = false; String fileName = spaceName + ".v" + dv.getFriendlyVersionNumber() + ".zip"; // Add BagIt ZIP file // Google uses MD5 as one way to verify the // transfer messageDigest = MessageDigest.getInstance("MD5"); - try (PipedInputStream in = new PipedInputStream(100000); DigestInputStream digestInputStream2 = new DigestInputStream(in, messageDigest);) { - Thread writeThread = new Thread(new Runnable() { - public void run() { - try (PipedOutputStream out = new PipedOutputStream(in)) { - // Generate bag - BagGenerator bagger = new BagGenerator(new OREMap(dv, false), dataciteXml); - bagger.setNumConnections(getNumberOfBagGeneratorThreads()); - bagger.setAuthenticationKey(token.getTokenString()); - bagger.generateBag(out); - } catch (Exception e) { - logger.severe("Error creating bag: " + e.getMessage()); - // TODO Auto-generated catch block - e.printStackTrace(); - try { - digestInputStream2.close(); - } catch(Exception ex) { - logger.warning(ex.getLocalizedMessage()); - } - throw new RuntimeException("Error creating bag: " + e.getMessage()); - } - } - }); - writeThread.start(); - /* - * The following loop handles two issues. First, with no delay, the - * bucket.create() call below can get started before the piped streams are set - * up, causing a failure (seen when triggered in a PostPublishDataset workflow). - * A minimal initial wait, e.g. until some bytes are available, would address - * this. Second, the BagGenerator class, due to it's use of parallel streaming - * creation of the zip file, has the characteristic that it makes a few bytes - * available - from setting up the directory structure for the zip file - - * significantly earlier than it is ready to stream file content (e.g. for - * thousands of files and GB of content). If, for these large datasets, - * bucket.create() is called as soon as bytes are available, the call can - * timeout before the bytes for all the zipped files are available. To manage - * this, the loop waits until 90K bytes are available, larger than any expected - * dir structure for the zip and implying that the main zipped content is - * available, or until the thread terminates, with all of its content written to - * the pipe. (Note the PipedInputStream buffer is set at 100K above - I didn't - * want to test whether that means that exactly 100K bytes will be available() - * for large datasets or not, so the test below is at 90K.) - * - * An additional sanity check limits the wait to 2K seconds. The BagGenerator - * has been used to archive >120K files, 2K directories, and ~600GB files on the - * SEAD project (streaming content to disk rather than over an internet - * connection) which would take longer than 2K seconds (10+ hours) and might - * produce an initial set of bytes for directories > 90K. If Dataverse ever - * needs to support datasets of this size, the numbers here would need to be - * increased, and/or a change in how archives are sent to google (e.g. as - * multiple blobs that get aggregated) would be required. - */ - i=0; - while(digestInputStream2.available()<=90000 && i<2000 && writeThread.isAlive()) { - Thread.sleep(1000); - logger.fine("avail: " + digestInputStream2.available() + " : " + writeThread.getState().toString()); - i++; - } - logger.fine("Bag: transfer started, i=" + i + ", avail = " + digestInputStream2.available()); - if(i==2000) { - throw new IOException("Stream not available"); - } - Blob bag = bucket.create(spaceName + "/" + fileName, digestInputStream2, "application/zip", Bucket.BlobWriteOption.doesNotExist()); - if(bag.getSize()==0) { + try (PipedInputStream in = new PipedInputStream(100000); + DigestInputStream digestInputStream2 = new DigestInputStream(in, messageDigest)) { + Thread bagThread = startBagThread(dv, in, digestInputStream2, dataciteXml, token); + Blob bag = bucket.create(spaceName + "/" + fileName, digestInputStream2, "application/zip", + Bucket.BlobWriteOption.doesNotExist()); + if (bag.getSize() == 0) { throw new IOException("Empty Bag"); } - blobIdString = bag.getBlobId().getBucket() + "/" + bag.getBlobId().getName(); + bagThread.join(); + checksum = bag.getMd5ToHexString(); logger.fine("Bag: " + fileName + " added with checksum: " + checksum); localchecksum = Hex.encodeHexString(digestInputStream2.getMessageDigest().digest()); - if (!checksum.equals(localchecksum)) { - logger.severe(checksum + " not equal to " + localchecksum); + if (!success || !checksum.equals(localchecksum)) { + logger.severe(success ? checksum + " not equal to " + localchecksum + : "bag transfer did not succeed"); + try { + bag.delete(Blob.BlobSourceOption.generationMatch()); + } catch (StorageException se) { + logger.warning(se.getMessage()); + } return new Failure("Error in transferring Zip file to GoogleCloud", "GoogleCloud Submission Failure: incomplete archive transfer"); } - } catch (RuntimeException rte) { - logger.severe("Error creating Bag during GoogleCloud archiving: " + rte.getMessage()); - return new Failure("Error in generating Bag", - "GoogleCloud Submission Failure: archive file not created"); } logger.fine("GoogleCloud Submission step: Content Transferred"); // Document the location of dataset archival copy location (actually the URL - // where you can - // view it as an admin) + // where you can view it as an admin) + // Changed to point at bucket where the zip and datacite.xml are visible StringBuffer sb = new StringBuffer("https://console.cloud.google.com/storage/browser/"); - sb.append(blobIdString); + sb.append(bucketName + "/" + spaceName); dv.setArchivalCopyLocation(sb.toString()); - } catch (RuntimeException rte) { - logger.severe("Error creating datacite xml file during GoogleCloud Archiving: " + rte.getMessage()); - return new Failure("Error in generating datacite.xml file", - "GoogleCloud Submission Failure: metadata file not created"); } } else { logger.warning("GoogleCloud Submision Workflow aborted: Dataset locked for pidRegister");