Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,30 @@ public interface DataSegmentPusher
@Deprecated
String getPathForHadoop(String dataSource);
String getPathForHadoop();
DataSegment push(File file, DataSegment segment) throws IOException;

/**
* Pushes index files and segment descriptor to deep storage.
* @param file directory containing index files
* @param segment segment descriptor
* @param replaceExisting overwrites existing objects if true, else leaves existing objects unchanged on conflict.
* The behavior of the indexer determines whether this should be true or false. For example,
* since Tranquility does not guarantee that replica tasks will generate indexes with the same
* data, the first segment pushed should be favored since otherwise multiple historicals may
* load segments with the same identifier but different contents which is a bad situation. On
* the other hand, indexers that maintain exactly-once semantics by storing checkpoint data can
* lose or repeat data if it fails to write a segment because it already exists and overwriting
* is not permitted. This situation can occur if a task fails after pushing to deep storage but
* before writing to the metadata storage, see: https://github.com/druid-io/druid/issues/5161.
*
* If replaceExisting is true, existing objects MUST be overwritten, since failure to do so
* will break exactly-once semantics. If replaceExisting is false, existing objects SHOULD be
* prioritized but it is acceptable if they are overwritten (deep storages may be eventually
* consistent or otherwise unable to support transactional writes).
* @return segment descriptor
* @throws IOException
*/
DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException;

//use map instead of LoadSpec class to avoid dependency pollution.
Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,13 @@ public DataSegment uploadDataSegment(
final long size,
final File compressedSegmentData,
final File descriptorFile,
final Map<String, String> azurePaths
final Map<String, String> azurePaths,
final boolean replaceExisting
)
throws StorageException, IOException, URISyntaxException
{
azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index"));
azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"));
azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index"), replaceExisting);
azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"), replaceExisting);

final DataSegment outSegment = segment
.withSize(size)
Expand All @@ -131,9 +132,9 @@ public DataSegment uploadDataSegment(
}

@Override
public DataSegment push(final File indexFilesDir, final DataSegment segment) throws IOException
public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean replaceExisting)
throws IOException
{

log.info("Uploading [%s] to Azure.", indexFilesDir);

final int version = SegmentUtils.getVersionFromDir(indexFilesDir);
Expand All @@ -153,7 +154,7 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr
@Override
public DataSegment call() throws Exception
{
return uploadDataSegment(segment, version, size, outFile, descFile, azurePaths);
return uploadDataSegment(segment, version, size, outFile, descFile, azurePaths, replaceExisting);
}
},
config.getMaxTries()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;

import io.druid.java.util.common.logger.Logger;

import java.io.File;
Expand Down Expand Up @@ -81,14 +80,24 @@ public List<String> emptyCloudBlobDirectory(final String containerName, final St

}

public void uploadBlob(final File file, final String containerName, final String blobPath)
public void uploadBlob(
final File file,
final String containerName,
final String blobPath,
final boolean replaceExisting
)
throws IOException, StorageException, URISyntaxException

{
CloudBlobContainer container = getCloudBlobContainer(containerName);
try (FileInputStream stream = new FileInputStream(file)) {
CloudBlockBlob blob = container.getBlockBlobReference(blobPath);
blob.upload(stream, file.length());

if (!replaceExisting && blob.exists()) {
log.info("Skipping push because blob [%s] exists && replaceExisting == false", blobPath);
} else {
blob.upload(stream, file.length());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void pushTaskLog(final String taskid, final File logFile) throws IOExcept
try {
AzureUtils.retryAzureOperation(
(Callable<Void>) () -> {
azureStorage.uploadBlob(logFile, config.getContainer(), taskKey);
azureStorage.uploadBlob(logFile, config.getContainer(), taskKey, true);
return null;
},
config.getMaxTries()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testPush() throws Exception
size
);

DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true);

Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
}
Expand Down Expand Up @@ -133,9 +133,9 @@ public void uploadDataSegmentTest() throws StorageException, IOException, URISyn
final File descriptorFile = new File("descriptor.json");
final Map<String, String> azurePaths = pusher.getAzurePaths(dataSegment);

azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index"));
azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index"), true);
expectLastCall();
azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor"));
azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor"), true);
expectLastCall();

replayAll();
Expand All @@ -146,7 +146,8 @@ public void uploadDataSegmentTest() throws StorageException, IOException, URISyn
0, // empty file
compressedSegmentData,
descriptorFile,
azurePaths
azurePaths,
true
);

assertEquals(compressedSegmentData.length(), pushedDataSegment.getSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testPushTaskLog() throws Exception
try {
final File logFile = new File(tmpDir, "log");

azureStorage.uploadBlob(logFile, container, prefix + "/" + taskid + "/log");
azureStorage.uploadBlob(logFile, container, prefix + "/" + taskid + "/log", true);
expectLastCall();

replayAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.recipes.storage.ChunkedStorage;

import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.SegmentUtils;
Expand Down Expand Up @@ -53,7 +54,8 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
@Inject
public CassandraDataSegmentPusher(
CassandraDataSegmentConfig config,
ObjectMapper jsonMapper)
ObjectMapper jsonMapper
)
{
super(config);
this.jsonMapper = jsonMapper;
Expand All @@ -73,13 +75,14 @@ public String getPathForHadoop(String dataSource)
}

@Override
public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
public DataSegment push(final File indexFilesDir, DataSegment segment, final boolean replaceExisting)
throws IOException
{
log.info("Writing [%s] to C*", indexFilesDir);
String key = JOINER.join(
config.getKeyspace().isEmpty() ? null : config.getKeyspace(),
this.getStorageDir(segment)
);
);

// Create index
final File compressedIndexFile = File.createTempFile("druid", "index.zip");
Expand All @@ -89,26 +92,28 @@ public DataSegment push(final File indexFilesDir, DataSegment segment) throws IO
int version = SegmentUtils.getVersionFromDir(indexFilesDir);

try {
long start = System.currentTimeMillis();
ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile))
.withConcurrencyLevel(CONCURRENCY).call();
byte[] json = jsonMapper.writeValueAsBytes(segment);
MutationBatch mutation = this.keyspace.prepareMutationBatch();
mutation.withRow(descriptorStorage, key)
.putColumn("lastmodified", System.currentTimeMillis(), null)
.putColumn("descriptor", json, null);
mutation.execute();
log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start);
if (!replaceExisting && doesObjectExist(indexStorage, key)) {
log.info("Skipping push because key [%s] exists && replaceExisting == false", key);
} else {
long start = System.currentTimeMillis();
ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile))
.withConcurrencyLevel(CONCURRENCY).call();
byte[] json = jsonMapper.writeValueAsBytes(segment);
MutationBatch mutation = this.keyspace.prepareMutationBatch();
mutation.withRow(descriptorStorage, key)
.putColumn("lastmodified", System.currentTimeMillis(), null)
.putColumn("descriptor", json, null);
mutation.execute();
log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start);
}
}
catch (Exception e) {
throw new IOException(e);
}

segment = segment.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object> of("type", "c*", "key", key)
)
.withBinaryVersion(version);
.withLoadSpec(ImmutableMap.<String, Object>of("type", "c*", "key", key))
.withBinaryVersion(version);

log.info("Deleting zipped index File[%s]", compressedIndexFile);
compressedIndexFile.delete();
Expand All @@ -120,4 +125,14 @@ public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException("not supported");
}

private boolean doesObjectExist(ChunkedStorageProvider provider, String objectName) throws Exception
{
try {
return ChunkedStorage.newInfoReader(provider, objectName).call().isValidForRead();
}
catch (NotFoundException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public String getPathForHadoop(final String dataSource)
}

@Override
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean replaceExisting)
throws IOException
{
final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), getStorageDir(inSegment));

Expand All @@ -98,18 +99,23 @@ public DataSegment call() throws Exception
segmentPath, outFile, objectApi.getRegion(),
objectApi.getContainer()
);
log.info("Pushing %s.", segmentData.getPath());
objectApi.put(segmentData);

// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment));
CloudFilesObject descriptorData = new CloudFilesObject(
segmentPath, descFile,
objectApi.getRegion(), objectApi.getContainer()
);
log.info("Pushing %s.", descriptorData.getPath());
objectApi.put(descriptorData);

if (!replaceExisting && objectApi.exists(segmentData.getPath())) {
log.info("Skipping push because object [%s] exists && replaceExisting == false", segmentData.getPath());
} else {
log.info("Pushing %s.", segmentData.getPath());
objectApi.put(segmentData);

// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment));
CloudFilesObject descriptorData = new CloudFilesObject(
segmentPath, descFile,
objectApi.getRegion(), objectApi.getContainer()
);
log.info("Pushing %s.", descriptorData.getPath());
objectApi.put(descriptorData);
}

final DataSegment outSegment = inSegment
.withSize(indexSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,9 @@ public CloudFilesObject get(String path)
Payload payload = swiftObject.getPayload();
return new CloudFilesObject(payload, this.region, this.container, path);
}

public boolean exists(String path)
{
return objectApi.getWithoutBody(path) != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testPush() throws Exception
size
);

DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true);

Assert.assertEquals(segmentToPush.getSize(), segment.getSize());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public File createDescriptorFile(final ObjectMapper jsonMapper, final DataSegmen
return descriptorFile;
}

public void insert(final File file, final String contentType, final String path) throws IOException
public void insert(final File file, final String contentType, final String path, final boolean replaceExisting)
throws IOException
{
LOG.info("Inserting [%s] to [%s]", file, path);

Expand All @@ -102,11 +103,16 @@ public void insert(final File file, final String contentType, final String path)
InputStreamContent mediaContent = new InputStreamContent(contentType, fileSteam);
mediaContent.setLength(file.length());

storage.insert(config.getBucket(), path, mediaContent);
if (!replaceExisting && storage.exists(config.getBucket(), path)) {
LOG.info("Skipping push because path [%s] exists && replaceExisting == false", path);
} else {
storage.insert(config.getBucket(), path, mediaContent);
}
}

@Override
public DataSegment push(final File indexFilesDir, final DataSegment segment) throws IOException
public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean replaceExisting)
throws IOException
{
LOG.info("Uploading [%s] to Google.", indexFilesDir);

Expand All @@ -128,8 +134,8 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr

descriptorFile = createDescriptorFile(jsonMapper, outSegment);

insert(indexFile, "application/zip", indexPath);
insert(descriptorFile, "application/json", descriptorPath);
insert(indexFile, "application/zip", indexPath, replaceExisting);
insert(descriptorFile, "application/json", descriptorPath, replaceExisting);

return outSegment;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,30 @@ public void testPush() throws Exception
storage,
googleAccountConfig,
jsonMapper
).addMockedMethod("insert", File.class, String.class, String.class).createMock();
).addMockedMethod("insert", File.class, String.class, String.class, boolean.class).createMock();

final String storageDir = pusher.getStorageDir(segmentToPush);
final String indexPath = prefix + "/" + storageDir + "/" + "index.zip";
final String descriptorPath = prefix + "/" + storageDir + "/" + "descriptor.json";

pusher.insert(EasyMock.anyObject(File.class), EasyMock.eq("application/zip"), EasyMock.eq(indexPath));
pusher.insert(
EasyMock.anyObject(File.class),
EasyMock.eq("application/zip"),
EasyMock.eq(indexPath),
EasyMock.eq(true)
);
expectLastCall();
pusher.insert(EasyMock.anyObject(File.class), EasyMock.eq("application/json"), EasyMock.eq(descriptorPath));
pusher.insert(
EasyMock.anyObject(File.class),
EasyMock.eq("application/json"),
EasyMock.eq(descriptorPath),
EasyMock.eq(true)
);
expectLastCall();

replayAll();

DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true);

Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
Assert.assertEquals(segmentToPush, segment);
Expand Down
Loading