Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,31 @@

import io.druid.timeline.DataSegment;

import javax.annotation.Nullable;

public interface DataSegmentArchiver
{
public DataSegment archive(DataSegment segment) throws SegmentLoadingException;
public DataSegment restore(DataSegment segment) throws SegmentLoadingException;
/**
* Perform an archive task on the segment and return the resulting segment or null if there was no action needed.
*
* @param segment The source segment
*
* @return The segment after archiving or `null` if there was no archiving performed.
*
* @throws SegmentLoadingException on error
*/
@Nullable
DataSegment archive(DataSegment segment) throws SegmentLoadingException;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe annotate with @javax.annotation.Nullable? This will make IDE to warn you about using the object, returned from this method, without null check.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of returning null, perhaps a nicer API would be to return an equivalent object. Then callers don't need to deal with potentially null returns. If a caller wants to know if anything was actually done, they could check retVal.equals(segment) rather than retVal != null.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's pretty challenging since loadSpec is just a Map<String, Object> which may have whatever junk in it. I opted for this way of doing it so that the implementation can make the determination on if the segment was moved or not.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also see #3287 and #3286 for semi-related

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, I forgot DataSegment identity is based around the segment identifier. OK then in that case what I said is nonsense.


/**
* Perform the restore from an archived segment and return the resulting segment or null if there was no action
*
* @param segment The source (archived) segment
*
* @return The segment after it has been unarchived
*
* @throws SegmentLoadingException on error
*/
@Nullable
DataSegment restore(DataSegment segment) throws SegmentLoadingException;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe annotate with @javax.annotation.Nullable?

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package io.druid.storage.s3;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.druid.guice.annotations.Json;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.LoadSpec;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
Expand All @@ -31,15 +35,18 @@ public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSeg
{
private final S3DataSegmentArchiverConfig archiveConfig;
private final S3DataSegmentPusherConfig restoreConfig;
private final ObjectMapper mapper;

@Inject
public S3DataSegmentArchiver(
RestS3Service s3Client,
S3DataSegmentArchiverConfig archiveConfig,
S3DataSegmentPusherConfig restoreConfig
@Json ObjectMapper mapper,
RestS3Service s3Client,
S3DataSegmentArchiverConfig archiveConfig,
S3DataSegmentPusherConfig restoreConfig
)
{
super(s3Client, restoreConfig);
this.mapper = mapper;
this.archiveConfig = archiveConfig;
this.restoreConfig = restoreConfig;
}
Expand All @@ -50,13 +57,17 @@ public DataSegment archive(DataSegment segment) throws SegmentLoadingException
String targetS3Bucket = archiveConfig.getArchiveBucket();
String targetS3BaseKey = archiveConfig.getArchiveBaseKey();

return move(
final DataSegment archived = move(
segment,
ImmutableMap.<String, Object>of(
"bucket", targetS3Bucket,
"baseKey", targetS3BaseKey
)
);
if (sameLoadSpec(segment, archived)) {
return null;
}
return archived;
}

@Override
Expand All @@ -65,12 +76,27 @@ public DataSegment restore(DataSegment segment) throws SegmentLoadingException
String targetS3Bucket = restoreConfig.getBucket();
String targetS3BaseKey = restoreConfig.getBaseKey();

return move(
final DataSegment restored = move(
segment,
ImmutableMap.<String, Object>of(
"bucket", targetS3Bucket,
"baseKey", targetS3BaseKey
)
);

if (sameLoadSpec(segment, restored)) {
return null;
}
return restored;
}

boolean sameLoadSpec(DataSegment s1, DataSegment s2)
{
final S3LoadSpec s1LoadSpec = (S3LoadSpec) mapper.convertValue(s1.getLoadSpec(), LoadSpec.class);
final S3LoadSpec s2LoadSpec = (S3LoadSpec) mapper.convertValue(s2.getLoadSpec(), LoadSpec.class);
return Objects.equal(s1LoadSpec.getBucket(), s2LoadSpec.getBucket()) && Objects.equal(
s1LoadSpec.getKey(),
s2LoadSpec.getKey()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
@JsonTypeName(S3StorageDruidModule.SCHEME)
public class S3LoadSpec implements LoadSpec
{
@JsonProperty(S3DataSegmentPuller.BUCKET)
private final String bucket;
@JsonProperty(S3DataSegmentPuller.KEY)
private final String key;

private final S3DataSegmentPuller puller;
Expand All @@ -61,4 +59,16 @@ public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(new S3DataSegmentPuller.S3Coords(bucket, key), outDir).size());
}

@JsonProperty(S3DataSegmentPuller.BUCKET)
public String getBucket()
{
return bucket;
}

@JsonProperty(S3DataSegmentPuller.KEY)
public String getKey()
{
return key;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.storage.s3;

import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Map;

public class S3DataSegmentArchiverTest
{
private static final ObjectMapper MAPPER = new DefaultObjectMapper()
.setInjectableValues(new InjectableValues()
{
@Override
public Object findInjectableValue(
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
)
{
return PULLER;
}
}).registerModule(new SimpleModule("s3-archive-test-module").registerSubtypes(S3LoadSpec.class));
private static final S3DataSegmentArchiverConfig ARCHIVER_CONFIG = new S3DataSegmentArchiverConfig()
{
@Override
public String getArchiveBucket()
{
return "archive_bucket";
}

@Override
public String getArchiveBaseKey()
{
return "archive_base_key";
}
};
private static final S3DataSegmentPusherConfig PUSHER_CONFIG = new S3DataSegmentPusherConfig();
private static final RestS3Service S3_SERVICE = EasyMock.createStrictMock(RestS3Service.class);
private static final S3DataSegmentPuller PULLER = new S3DataSegmentPuller(S3_SERVICE);
private static final DataSegment SOURCE_SEGMENT = DataSegment
.builder()
.binaryVersion(1)
.dataSource("dataSource")
.dimensions(ImmutableList.<String>of())
.interval(Interval.parse("2015/2016"))
.version("version")
.loadSpec(ImmutableMap.<String, Object>of(
"type",
S3StorageDruidModule.SCHEME,
S3DataSegmentPuller.BUCKET,
"source_bucket",
S3DataSegmentPuller.KEY,
"source_key"
))
.build();

@BeforeClass
public static void setUpStatic()
{
PUSHER_CONFIG.setBaseKey("push_base");
PUSHER_CONFIG.setBucket("push_bucket");
}

@Test
public void testSimpleArchive() throws Exception
{
final DataSegment archivedSegment = SOURCE_SEGMENT
.withLoadSpec(ImmutableMap.<String, Object>of(
"type",
S3StorageDruidModule.SCHEME,
S3DataSegmentPuller.BUCKET,
ARCHIVER_CONFIG.getArchiveBucket(),
S3DataSegmentPuller.KEY,
ARCHIVER_CONFIG.getArchiveBaseKey() + "archived"
));
final S3DataSegmentArchiver archiver = new S3DataSegmentArchiver(MAPPER, S3_SERVICE, ARCHIVER_CONFIG, PUSHER_CONFIG)
{
@Override
public DataSegment move(DataSegment segment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
return archivedSegment;
}
};
Assert.assertEquals(archivedSegment, archiver.archive(SOURCE_SEGMENT));
}

@Test
public void testSimpleArchiveDoesntMove() throws Exception
{
final S3DataSegmentArchiver archiver = new S3DataSegmentArchiver(MAPPER, S3_SERVICE, ARCHIVER_CONFIG, PUSHER_CONFIG)
{
@Override
public DataSegment move(DataSegment segment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
return SOURCE_SEGMENT;
}
};
Assert.assertNull(archiver.archive(SOURCE_SEGMENT));
}

@Test
public void testSimpleRestore() throws Exception
{
final DataSegment archivedSegment = SOURCE_SEGMENT
.withLoadSpec(ImmutableMap.<String, Object>of(
"type",
S3StorageDruidModule.SCHEME,
S3DataSegmentPuller.BUCKET,
ARCHIVER_CONFIG.getArchiveBucket(),
S3DataSegmentPuller.KEY,
ARCHIVER_CONFIG.getArchiveBaseKey() + "archived"
));
final S3DataSegmentArchiver archiver = new S3DataSegmentArchiver(MAPPER, S3_SERVICE, ARCHIVER_CONFIG, PUSHER_CONFIG)
{
@Override
public DataSegment move(DataSegment segment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
return archivedSegment;
}
};
Assert.assertEquals(archivedSegment, archiver.restore(SOURCE_SEGMENT));
}

@Test
public void testSimpleRestoreDoesntMove() throws Exception
{
final S3DataSegmentArchiver archiver = new S3DataSegmentArchiver(MAPPER, S3_SERVICE, ARCHIVER_CONFIG, PUSHER_CONFIG)
{
@Override
public DataSegment move(DataSegment segment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
return SOURCE_SEGMENT;
}
};
Assert.assertNull(archiver.restore(SOURCE_SEGMENT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
// Move segments
for (DataSegment segment : unusedSegments) {
final DataSegment archivedSegment = toolbox.getDataSegmentArchiver().archive(segment);
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(archivedSegment)));
if (archivedSegment != null) {
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(archivedSegment)));
} else {
log.info("No action was taken for [%s]", segment);
}
}

return TaskStatus.success(getId());
Expand Down
Loading