Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.storage.google;

import com.google.api.client.http.HttpResponseException;
import com.google.common.base.Predicates;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.RE;
Expand All @@ -37,11 +38,19 @@ public class GoogleDataSegmentKiller implements DataSegmentKiller
private static final Logger LOG = new Logger(GoogleDataSegmentKiller.class);

private final GoogleStorage storage;
private final GoogleAccountConfig accountConfig;
private final GoogleInputDataConfig inputDataConfig;

@Inject
public GoogleDataSegmentKiller(final GoogleStorage storage)
public GoogleDataSegmentKiller(
final GoogleStorage storage,
GoogleAccountConfig accountConfig,
GoogleInputDataConfig inputDataConfig
)
{
this.storage = storage;
this.accountConfig = accountConfig;
this.inputDataConfig = inputDataConfig;
}

@Override
Expand Down Expand Up @@ -93,8 +102,25 @@ private void deleteIfPresent(String bucket, String path) throws IOException
}

@Override
public void killAll()
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
LOG.info(
"Deleting all segment files from gs location [bucket: '%s' prefix: '%s']",
accountConfig.getBucket(),
accountConfig.getPrefix()
);
try {
GoogleUtils.deleteObjectsInPath(
storage,
inputDataConfig,
accountConfig.getBucket(),
accountConfig.getPrefix(),
Predicates.alwaysTrue()
);
}
catch (Exception e) {
LOG.error("Error occurred while deleting task log files from gs. Error: %s", e.getMessage());
throw new IOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
Expand All @@ -33,19 +34,29 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.Date;

public class GoogleTaskLogs implements TaskLogs
{
private static final Logger LOG = new Logger(GoogleTaskLogs.class);

private final GoogleTaskLogsConfig config;
private final GoogleStorage storage;
private final GoogleInputDataConfig inputDataConfig;
private final CurrentTimeMillisSupplier timeSupplier;

@Inject
public GoogleTaskLogs(GoogleTaskLogsConfig config, GoogleStorage storage)
public GoogleTaskLogs(
GoogleTaskLogsConfig config,
GoogleStorage storage,
GoogleInputDataConfig inputDataConfig,
CurrentTimeMillisSupplier timeSupplier
)
{
this.config = config;
this.storage = storage;
this.inputDataConfig = inputDataConfig;
this.timeSupplier = timeSupplier;
}

@Override
Expand Down Expand Up @@ -159,14 +170,39 @@ private String getTaskReportKey(String taskid)
}

@Override
public void killAll()
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
LOG.info(
"Deleting all task logs from gs location [bucket: '%s' prefix: '%s'].",
config.getBucket(),
config.getPrefix()
);

long now = timeSupplier.getAsLong();
killOlderThan(now);
}

@Override
public void killOlderThan(long timestamp)
public void killOlderThan(long timestamp) throws IOException
{
throw new UnsupportedOperationException("not implemented");
LOG.info(
"Deleting all task logs from gs location [bucket: '%s' prefix: '%s'] older than %s.",
config.getBucket(),
config.getPrefix(),
new Date(timestamp)
);
try {
GoogleUtils.deleteObjectsInPath(
storage,
inputDataConfig,
config.getBucket(),
config.getPrefix(),
(object) -> object.getUpdated().getValue() < timestamp
);
}
catch (Exception e) {
LOG.error("Error occurred while deleting task log files from gs. Error: %s", e.getMessage());
throw new IOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@
import com.google.api.client.http.HttpResponseException;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.google.GoogleCloudStorageInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.logger.Logger;

import java.io.IOException;
import java.net.URI;
import java.util.Iterator;

public class GoogleUtils
{
private static final Logger log = new Logger(GoogleUtils.class);
public static final Predicate<Throwable> GOOGLE_RETRY = GoogleUtils::isRetryable;

public static boolean isRetryable(Throwable t)
Expand Down Expand Up @@ -66,4 +69,40 @@ public static Iterator<StorageObject> lazyFetchingStorageObjectsIterator(
{
return new ObjectStorageIterator(storage, uris, maxListingLength);
}

/**
* Delete the files from Google Storage in a specified bucket, matching a specified prefix and filter
*
* @param storage Google Storage client
* @param config specifies the configuration to use when finding matching files in Google Storage to delete
* @param bucket Google Storage bucket
* @param prefix the file prefix
* @param filter function which returns true if the prefix file found should be deleted and false otherwise.
* @throws Exception
*/
public static void deleteObjectsInPath(
GoogleStorage storage,
GoogleInputDataConfig config,
String bucket,
String prefix,
Predicate<StorageObject> filter
)
throws Exception
{
final Iterator<StorageObject> iterator = lazyFetchingStorageObjectsIterator(
storage,
ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("gs")).iterator(),
config.getMaxListingLength()
);

while (iterator.hasNext()) {
final StorageObject nextObject = iterator.next();
if (filter.apply(nextObject)) {
retryGoogleCloudStorageOperation(() -> {
storage.delete(nextObject.getBucket(), nextObject.getName());
return null;
});
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,43 @@

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.testing.json.GoogleJsonResponseExceptionFactoryTesting;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.net.URI;

public class GoogleDataSegmentKillerTest extends EasyMockSupport
{
private static final String KEY_1 = "key1";
private static final String KEY_2 = "key2";
private static final String BUCKET = "bucket";
private static final String PREFIX = "test/log";
private static final URI PREFIX_URI = URI.create(StringUtils.format("gs://%s/%s", BUCKET, PREFIX));
private static final String INDEX_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
private static final String DESCRIPTOR_PATH = DataSegmentKiller.descriptorPath(INDEX_PATH);
private static final long TIME_0 = 0L;
private static final long TIME_1 = 1L;
private static final int MAX_KEYS = 1;
private static final Exception RECOVERABLE_EXCEPTION = new HttpResponseException.Builder(429, "recoverable", new HttpHeaders()).build();
private static final Exception NON_RECOVERABLE_EXCEPTION = new HttpResponseException.Builder(404, "non recoverable", new HttpHeaders()).build();


private static final DataSegment DATA_SEGMENT = new DataSegment(
"test",
Expand All @@ -54,10 +72,14 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport
);

private GoogleStorage storage;
private GoogleAccountConfig accountConfig;
private GoogleInputDataConfig inputDataConfig;

@Before
public void before()
{
accountConfig = createMock(GoogleAccountConfig.class);
inputDataConfig = createMock(GoogleInputDataConfig.class);
storage = createMock(GoogleStorage.class);
}

Expand All @@ -71,7 +93,7 @@ public void killTest() throws SegmentLoadingException, IOException

replayAll();

GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage);
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);

killer.kill(DATA_SEGMENT);

Expand All @@ -91,7 +113,7 @@ public void killWithErrorTest() throws SegmentLoadingException, IOException

replayAll();

GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage);
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);

killer.kill(DATA_SEGMENT);

Expand All @@ -113,10 +135,117 @@ public void killRetryWithErrorTest() throws SegmentLoadingException, IOException

replayAll();

GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage);
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);

killer.kill(DATA_SEGMENT);

verifyAll();
}

@Test
public void test_killAll_noException_deletesAllTaskLogs() throws IOException
{
StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1);

Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);

GoogleTestUtils.expectListObjects(
listRequest,
PREFIX_URI,
MAX_KEYS,
ImmutableList.of(object1, object2)
);

GoogleTestUtils.expectDeleteObjects(
storage,
ImmutableList.of(object1, object2),
ImmutableMap.of()
);
EasyMock.expect(accountConfig.getBucket()).andReturn(BUCKET).anyTimes();
EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);

EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage);

GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);

killer.killAll();

EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage);
}


@Test
public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException
{
StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);

Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);

GoogleTestUtils.expectListObjects(
listRequest,
PREFIX_URI,
MAX_KEYS,
ImmutableList.of(object1)
);

GoogleTestUtils.expectDeleteObjects(
storage,
ImmutableList.of(object1),
ImmutableMap.of(object1, RECOVERABLE_EXCEPTION)
);

EasyMock.expect(accountConfig.getBucket()).andReturn(BUCKET).anyTimes();
EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);

EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage);

GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
killer.killAll();

EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage);
}

@Test
public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs()
{
boolean ioExceptionThrown = false;
Storage.Objects.List listRequest = null;
try {
StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);

listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI);

GoogleTestUtils.expectListObjects(
listRequest,
PREFIX_URI,
MAX_KEYS,
ImmutableList.of(object1)
);

GoogleTestUtils.expectDeleteObjects(
storage,
ImmutableList.of(),
ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION)
);

EasyMock.expect(accountConfig.getBucket()).andReturn(BUCKET).anyTimes();
EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);

EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage);

GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
killer.killAll();
}
catch (IOException e) {
ioExceptionThrown = true;
}

Assert.assertTrue(ioExceptionThrown);

EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage);
}
}
Loading