Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
fac9d34
Fix lookup logging on node start (#5206) (#5207)
jon-wei Jan 4, 2018
e64092a
Fix broken KafkaEmitterConfig parsing (#5201) (#5208)
jon-wei Jan 4, 2018
14da754
Support replaceExisting parameter for segments pushers (#5187) (#5209)
gianm Jan 4, 2018
f3dec31
Upgrade to Calcite 1.15.0 (#5210) (#5214)
jon-wei Jan 4, 2018
a15becb
Add missing auth doc links (#5224) (#5225)
jon-wei Jan 5, 2018
5555809
Exclude sketches-core from druid-sql (#5223) (#5226)
jon-wei Jan 6, 2018
ee96a6a
Throw away rows with timestamps beyond long bounds in kafka indexing …
jon-wei Jan 9, 2018
d14b261
Support for encryption of MySQL connections (#5122) (#5247)
jon-wei Jan 10, 2018
61bef7d
Fix state check bug in Kafka Index Task (#5204) (#5248)
pjain1 Jan 10, 2018
87a1170
[Backport] fix timewarp query results when using timezones and crossi…
jon-wei Jan 12, 2018
08bd954
Fix APPROX_QUANTILE on outer groupBys. (#5253) (#5254)
gianm Jan 12, 2018
d7fe792
[maven-release-plugin] prepare release druid-0.12.0-rc1
jon-wei Jan 18, 2018
b287f8f
[maven-release-plugin] prepare for next development iteration
jon-wei Jan 18, 2018
5a8b106
Fix bugs in ImmutableRTree; Merge bytebuffer-collections module into …
leventov Jan 23, 2018
3f53820
Fix rewrite of queryPath for encoded joda intervals as query param on…
nishantmonu51 Jan 23, 2018
cdcd7e3
clean up intermediate_pushes directory for LocalDataSegmentPusher (#5…
dclim Jan 30, 2018
00f78ee
[Backport] fix RemoteTaskRunner terminating lazy workers below autosc…
gianm Feb 2, 2018
b5e1108
Remove Escalator jetty http client escalation method (#5322) (#5348)
jon-wei Feb 5, 2018
9a2a182
Fix CompactionTask doc (#5354)
jihoonson Feb 6, 2018
1d251d4
More ParseException handling for numeric dimensions (#5312) (#5356)
gianm Feb 7, 2018
7f28090
Fix two improper casts in HavingSpecMetricComparator. (#5352) (#5357)
gianm Feb 7, 2018
460ed49
Fix races in LookupSnapshotTaker, CoordinatorPollingBasicAuthenticato…
jon-wei Feb 7, 2018
4ba1bb8
Fix race in CoordinatorPollingBasicAuthorizerCacheManager. (#5359) (#…
jon-wei Feb 7, 2018
49e27d6
[Backport] Use a separate snapshot file per lookup tier. (#5358) (#5367)
jon-wei Feb 7, 2018
ab765b9
[Backport] Add metamx emitter, http clients, and metrics packages to …
jon-wei Feb 15, 2018
53eb4c7
fix segment info in Kafka indexing service docs (#5390) (#5393)
gianm Feb 15, 2018
93b79ed
Debug logging in HttpPostEmitter and Batch (#5365) (#5394)
gianm Feb 15, 2018
359d93e
Make HttpPostEmitter more robust in the face of serious errors (like …
gianm Feb 16, 2018
13b6ffd
Fix early publishing to early pushing in batch indexing & refactor ap…
jihoonson Feb 16, 2018
8c46900
[maven-release-plugin] prepare release druid-0.12.0-rc2
jon-wei Feb 16, 2018
ebb0592
[maven-release-plugin] prepare for next development iteration
jon-wei Feb 16, 2018
db31e2a
[Backport] Fix DefaultLimitSpec to respect sortByDimsFirst (#5398)
jihoonson Feb 17, 2018
3e84979
[maven-release-plugin] prepare release druid-0.12.0-rc3
jon-wei Feb 17, 2018
ce8220c
[maven-release-plugin] prepare for next development iteration
jon-wei Feb 17, 2018
5c73cba
Automatically adjust com.metamx.metrics Monitor class references (#54…
jon-wei Feb 23, 2018
0be7c55
Skip normal authentication for JDBC requests in Router (#5435) (#5442)
jon-wei Feb 28, 2018
d094f56
add task priority for kafka indexing (#5446)
jihoonson Mar 1, 2018
5a11b09
[Backport] Fix GroupBy limit push down descending sorting on numeric …
jon-wei Mar 2, 2018
c9e5f1b
Fix authorization check in supervisor history API (#5460) (#5463)
jon-wei Mar 3, 2018
8265f44
[Backport] Fix JSON serde for taskStatusPlus (#5469) (#5472)
jon-wei Mar 6, 2018
125552a
[maven-release-plugin] prepare release druid-0.12.0
jon-wei Mar 6, 2018
f650a1a
[maven-release-plugin] prepare for next development iteration
jon-wei Mar 6, 2018
f2534f0
SQL: Throttle metadata refreshes when they fail. (#5328) (#5599)
gianm Apr 10, 2018
4246779
Respect forceHashAggregation in queryContext (#5533) (#5609)
gianm Apr 10, 2018
ca43968
Add overlord unsecured paths to coordinator when using combined servi…
gianm Apr 10, 2018
d10c9ca
pass configuration from context into JobConf for determining Datasour…
gianm Apr 10, 2018
561b1b9
Lookups: Inherit "injective" from registered lookups, improve docs. (…
gianm Apr 10, 2018
d39a73e
More memory limiting for HttpPostEmitter (#5300) (#5602)
gianm Apr 10, 2018
8f43374
Fix round robining in router. (#5500) (#5612)
gianm Apr 10, 2018
a35fe63
SegmentMetadataQuery: Fix default interval handling. (#5489) (#5603)
gianm Apr 10, 2018
1a0c6d5
Authorize supervisor history instead of current active supervisors fo…
gianm Apr 10, 2018
62042c8
Fix SQLMetadataSegmentManager to allow succesive start and stop (#555…
gianm Apr 10, 2018
ae29895
Fix indexTask to respect forceExtendableShardSpecs (#5509) (#5607)
gianm Apr 10, 2018
f4e9e6b
ArrayAggregation: Use long to avoid overflow (#5544) (#5610)
gianm Apr 10, 2018
05fc1a3
DoublesSketchModule: Fix serde for DoublesSketchMergeAggregatorFactor…
gianm Apr 10, 2018
3a51416
Fix Kerberos Authentication failing requests without cookies and excl…
nishantmonu51 Apr 10, 2018
0b4632c
[Backport] Add missing type for MapVirtualColumn (#5616)
jihoonson Apr 10, 2018
aa37a78
Fix supervisor tombstone auth handling (#5504) (#5617)
gianm Apr 10, 2018
ce89de5
ParallelCombiner: Fix buffer leak on exception in "combine". (#5630) …
gianm Apr 12, 2018
5936b40
Fix coordinator loadStatus performance (#5632) (#5636)
jon-wei Apr 12, 2018
66ea105
Replace EmittedBatchCounter and UpdateCounter with ConcurrentAwaitabl…
gianm Apr 13, 2018
012a5be
[Backport] Fix NPE in compactionTask (#5645)
jihoonson Apr 17, 2018
5740ce9
[Backport] Fix HTTP OPTIONS request auth handling (#5638) (#5654)
jon-wei Apr 17, 2018
b551c9c
[Backport] Fix loadstatus?full double counting expected segments (#5680)
jihoonson Apr 24, 2018
7489fc9
[Backport] Add missing doc for automatic pendingSegments (#5682)
jihoonson Apr 24, 2018
0738e5c
[Backport] Fix coordinator's dataSource api with full parameter (#5679)
jihoonson Apr 24, 2018
a23cd5c
Use unique segment paths for Kafka indexing (#5692) (#5718)
dclim Apr 30, 2018
e2932fe
[maven-release-plugin] prepare release druid-0.12.1-rc1
jihoonson May 4, 2018
fc716fa
[maven-release-plugin] prepare for next development iteration
jihoonson May 4, 2018
7dc4f4b
[Backport] Fix Appenderator.push() to commit the metadata of all segm…
jihoonson May 8, 2018
8b7a2da
Kerberos Spnego Authentication Router Issue (#5706) (#5757)
b-slim May 8, 2018
af06da4
SegmentLoadDropHandler: Fix deadlock when segments have errors loadin…
gianm May 9, 2018
e0e55ee
[Backport] Fix metrics for inserting segments (#5759)
jihoonson May 10, 2018
a8d97a4
Fix KerberosAuthenticator serverPrincipal host replacement (#5766) (#…
jon-wei May 11, 2018
0a41d37
[maven-release-plugin] prepare release druid-0.12.1-rc2
jihoonson May 15, 2018
dffc819
[maven-release-plugin] prepare for next development iteration
jihoonson May 15, 2018
1b61559
[maven-release-plugin] prepare release druid-0.12.1
jihoonson Jun 5, 2018
b07b3af
[maven-release-plugin] prepare for next development iteration
jihoonson Jun 5, 2018
f974bb0
Fix defaultQueryTimeout (#5807) (#5944)
jihoonson Jul 5, 2018
15f99ec
Fix for when Hadoop dataSource inputSpec is specified multiple times.…
jihoonson Jul 5, 2018
cf03067
[Backport] Allow reordered segment allocation in kafka indexing servi…
jihoonson Jul 5, 2018
df7e822
Coordinator primary segment assignment fix (#5532)
clintropolis Apr 2, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.12.0-SNAPSHOT</version>
<version>0.12.2-SNAPSHOT</version>
</parent>

<dependencies>
Expand Down
6 changes: 4 additions & 2 deletions api/src/main/java/io/druid/guice/JsonConfigurator.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public <T> T configurate(Properties props, String propertyPrefix, Class<T> clazz
log.info(e, "Unable to parse [%s]=[%s] as a json object, using as is.", prop, propValue);
value = propValue;
}

hieraricalPutValue(propertyPrefix, prop, prop.substring(propertyBase.length()), value, jsonMap);
}
}
Expand Down Expand Up @@ -175,8 +174,11 @@ private static void hieraricalPutValue(
)
{
int dotIndex = property.indexOf('.');
// Always put property with name even if it is of form a.b. This will make sure the property is available for classes
// where JsonProperty names are of the form a.b
// Note:- this will cause more than required properties to be present in the jsonMap.
targetMap.put(property, value);
if (dotIndex < 0) {
targetMap.put(property, value);
return;
}
if (dotIndex == 0) {
Expand Down
42 changes: 40 additions & 2 deletions api/src/main/java/io/druid/indexer/TaskStatusPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.Objects;

public class TaskStatusPlus
{
Expand All @@ -40,7 +41,7 @@ public TaskStatusPlus(
@JsonProperty("id") String id,
@JsonProperty("createdTime") DateTime createdTime,
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
@JsonProperty("state") @Nullable TaskState state,
@JsonProperty("statusCode") @Nullable TaskState state,
@JsonProperty("duration") @Nullable Long duration,
@JsonProperty("location") TaskLocation location
)
Expand Down Expand Up @@ -74,7 +75,8 @@ public DateTime getQueueInsertionTime()
return queueInsertionTime;
}

@JsonProperty
@Nullable
@JsonProperty("statusCode")
public TaskState getState()
{
return state;
Expand All @@ -91,4 +93,40 @@ public TaskLocation getLocation()
{
return location;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

final TaskStatusPlus that = (TaskStatusPlus) o;
if (!id.equals(that.id)) {
return false;
}
if (!createdTime.equals(that.createdTime)) {
return false;
}
if (!queueInsertionTime.equals(that.queueInsertionTime)) {
return false;
}
if (!Objects.equals(state, that.state)) {
return false;
}
if (!Objects.equals(duration, that.duration)) {
return false;
}
return location.equals(that.location);
}

@Override
public int hashCode()
{
return Objects.hash(id, createdTime, queueInsertionTime, state, duration, location);
}
}
27 changes: 27 additions & 0 deletions api/src/main/java/io/druid/segment/loading/DataSegmentFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
package io.druid.segment.loading;

import io.druid.guice.annotations.ExtensionPoint;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.logger.Logger;
import io.druid.timeline.DataSegment;

import java.util.Map;
import java.util.Set;

/**
Expand All @@ -31,6 +34,8 @@
@ExtensionPoint
public interface DataSegmentFinder
{
Logger log = new Logger(DataSegmentFinder.class);

/**
* This method should first recursively look for descriptor.json (partitionNum_descriptor.json for HDFS data storage) underneath
* workingDirPath and then verify that index.zip (partitionNum_index.zip for HDFS data storage) exists in the same folder.
Expand All @@ -46,4 +51,26 @@ public interface DataSegmentFinder
* @return a set of segments that were found underneath workingDirPath
*/
Set<DataSegment> findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException;

/**
* Adds dataSegment if it does not exist in timestampedSegments. If it exists, replaces entry if segmentModifiedAt is
* newer than stored timestamp.
*
* @param timestampedSegments map of <segmentID, Pair<segment, modifiedAt>> containing segments with modified time
* @param dataSegment segment to add
* @param segmentModifiedAt segment modified timestamp
*/
static void putInMapRetainingNewest(
Map<String, Pair<DataSegment, Long>> timestampedSegments, DataSegment dataSegment, long segmentModifiedAt
)
{
timestampedSegments.merge(
dataSegment.getIdentifier(),
Pair.of(dataSegment, segmentModifiedAt),
(previous, current) -> {
log.warn("Multiple copies of segmentId [%s] found, using newest version", current.lhs.getIdentifier());
return previous.rhs > current.rhs ? previous : current;
}
);
}
}
33 changes: 29 additions & 4 deletions api/src/main/java/io/druid/segment/loading/DataSegmentKiller.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,41 @@
package io.druid.segment.loading;

import io.druid.guice.annotations.ExtensionPoint;
import io.druid.java.util.common.logger.Logger;
import io.druid.timeline.DataSegment;

import java.io.IOException;

/**
*/
@ExtensionPoint
public interface DataSegmentKiller
{
void kill(DataSegment segments) throws SegmentLoadingException;
void killAll() throws IOException;
Logger log = new Logger(DataSegmentKiller.class);

/**
* Removes segment files (index and metadata) from deep storage.
* @param segment the segment to kill
* @throws SegmentLoadingException if the segment could not be completely removed
*/
void kill(DataSegment segment) throws SegmentLoadingException;

/**
* A more stoic killer who doesn't throw a tantrum if things get messy. Use when killing segments for best-effort
* cleanup.
* @param segment the segment to kill
*/
default void killQuietly(DataSegment segment)
{
try {
kill(segment);
}
catch (Exception e) {
log.debug(e, "Failed to kill segment %s", segment);
}
}

/**
* Like a nuke. Use wisely. Used by the 'reset-cluster' command, and of the built-in deep storage implementations, it
* is only implemented by local and HDFS.
*/
void killAll() throws IOException;
}
52 changes: 47 additions & 5 deletions api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;

@ExtensionPoint
public interface DataSegmentPusher
Expand All @@ -39,18 +40,53 @@ public interface DataSegmentPusher
@Deprecated
String getPathForHadoop(String dataSource);
String getPathForHadoop();
DataSegment push(File file, DataSegment segment) throws IOException;

/**
* Pushes index files and segment descriptor to deep storage.
* @param file directory containing index files
* @param segment segment descriptor
* @param useUniquePath if true, pushes to a unique file path. This prevents situations where task failures or replica
* tasks can either overwrite or fail to overwrite existing segments leading to the possibility
* of different versions of the same segment ID containing different data. As an example, a Kafka
* indexing task starting at offset A and ending at offset B may push a segment to deep storage
* and then fail before writing the loadSpec to the metadata table, resulting in a replacement
* task being spawned. This replacement will also start at offset A but will read to offset C and
* will then push a segment to deep storage and write the loadSpec metadata. Without unique file
* paths, this can only work correctly if new segments overwrite existing segments. Suppose that
* at this point the task then fails so that the supervisor retries again from offset A. This 3rd
* attempt will overwrite the segments in deep storage before failing to write the loadSpec
* metadata, resulting in inconsistencies in the segment data now in deep storage and copies of
* the segment already loaded by historicals.
*
* If unique paths are used, caller is responsible for cleaning up segments that were pushed but
* were not written to the metadata table (for example when using replica tasks).
* @return segment descriptor
* @throws IOException
*/
DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException;

//use map instead of LoadSpec class to avoid dependency pollution.
Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath);

/**
* @deprecated backward-compatibiliy shim that should be removed on next major release;
* use {@link #getStorageDir(DataSegment, boolean)} instead.
*/
@Deprecated
default String getStorageDir(DataSegment dataSegment)
{
return getDefaultStorageDir(dataSegment);
return getStorageDir(dataSegment, false);
}

default String getStorageDir(DataSegment dataSegment, boolean useUniquePath)
{
return getDefaultStorageDir(dataSegment, useUniquePath);
}

default String makeIndexPathName(DataSegment dataSegment, String indexName)
{
return StringUtils.format("./%s/%s", getStorageDir(dataSegment), indexName);
// This is only called from Hadoop batch which doesn't require unique segment paths so set useUniquePath=false
return StringUtils.format("./%s/%s", getStorageDir(dataSegment, false), indexName);
}

/**
Expand All @@ -66,13 +102,19 @@ default List<String> getAllowedPropertyPrefixesForHadoop()
// If above format is ever changed, make sure to change it appropriately in other places
// e.g. HDFSDataSegmentKiller uses this information to clean the version, interval and dataSource directories
// on segment deletion if segment being deleted was the only segment
static String getDefaultStorageDir(DataSegment segment)
static String getDefaultStorageDir(DataSegment segment, boolean useUniquePath)
{
return JOINER.join(
segment.getDataSource(),
StringUtils.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
segment.getShardSpec().getPartitionNum(),
useUniquePath ? generateUniquePath() : null
);
}

static String generateUniquePath()
{
return UUID.randomUUID().toString();
}
}
12 changes: 6 additions & 6 deletions api/src/main/java/io/druid/utils/CompressionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,35 +36,35 @@ public class CompressionUtils
private static final Logger log = new Logger(CompressionUtils.class);


@Deprecated // Use com.metamx.common.CompressionUtils.zip
@Deprecated // Use io.druid.java.util.common.CompressionUtils.zip
public static long zip(File directory, File outputZipFile) throws IOException
{
return io.druid.java.util.common.CompressionUtils.zip(directory, outputZipFile);
}


@Deprecated // Use com.metamx.common.CompressionUtils.zip
@Deprecated // Use io.druid.java.util.common.CompressionUtils.zip
public static long zip(File directory, OutputStream out) throws IOException
{
return io.druid.java.util.common.CompressionUtils.zip(directory, out);
}

@Deprecated // Use com.metamx.common.CompressionUtils.unzip
@Deprecated // Use io.druid.java.util.common.CompressionUtils.unzip
public static void unzip(File pulledFile, File outDir) throws IOException
{
io.druid.java.util.common.CompressionUtils.unzip(pulledFile, outDir);
}

@Deprecated // Use com.metamx.common.CompressionUtils.unzip
@Deprecated // Use io.druid.java.util.common.CompressionUtils.unzip
public static void unzip(InputStream in, File outDir) throws IOException
{
io.druid.java.util.common.CompressionUtils.unzip(in, outDir);
}

/**
* Uncompress using a gzip uncompress algorithm from the `pulledFile` to the `outDir`.
* Unlike `com.metamx.common.CompressionUtils.gunzip`, this function takes an output *DIRECTORY* and tries to guess the file name.
* It is recommended that the caller use `com.metamx.common.CompressionUtils.gunzip` and specify the output file themselves to ensure names are as expected
* Unlike `io.druid.java.util.common.CompressionUtils.gunzip`, this function takes an output *DIRECTORY* and tries to guess the file name.
* It is recommended that the caller use `io.druid.java.util.common.CompressionUtils.gunzip` and specify the output file themselves to ensure names are as expected
*
* @param pulledFile The source file
* @param outDir The destination directory to put the resulting file
Expand Down
34 changes: 30 additions & 4 deletions api/src/test/java/io/druid/guice/JsonConfiguratorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,13 @@ public ExecutableValidator forExecutables()
public void testTest()
{
Assert.assertEquals(
new MappableObject("p1", ImmutableList.<String>of("p2")),
new MappableObject("p1", ImmutableList.<String>of("p2"))
new MappableObject("p1", ImmutableList.<String>of("p2"), "p2"),
new MappableObject("p1", ImmutableList.<String>of("p2"), "p2")
);
Assert.assertEquals(
new MappableObject("p1", null, null),
new MappableObject("p1", ImmutableList.<String>of(), null)
);
Assert.assertEquals(new MappableObject("p1", null), new MappableObject("p1", ImmutableList.<String>of()));
}

@Test
Expand Down Expand Up @@ -140,6 +143,19 @@ public void testQuotedConfig()
Assert.assertEquals("testing \"prop1\"", obj.prop1);
Assert.assertEquals(ImmutableList.of(), obj.prop1List);
}

@Test
public void testPropertyWithDot()
{
final JsonConfigurator configurator = new JsonConfigurator(mapper, validator);
properties.setProperty(PROP_PREFIX + "prop2.prop.2", "testing");
properties.setProperty(PROP_PREFIX + "prop1", "prop1");
final MappableObject obj = configurator.configurate(properties, PROP_PREFIX, MappableObject.class);
Assert.assertEquals("testing", obj.prop2);
Assert.assertEquals(ImmutableList.of(), obj.prop1List);
Assert.assertEquals("prop1", obj.prop1);

}
}

class MappableObject
Expand All @@ -148,15 +164,19 @@ class MappableObject
final String prop1;
@JsonProperty("prop1List")
final List<String> prop1List;
@JsonProperty("prop2.prop.2")
final String prop2;

@JsonCreator
protected MappableObject(
@JsonProperty("prop1") final String prop1,
@JsonProperty("prop1List") final List<String> prop1List
@JsonProperty("prop1List") final List<String> prop1List,
@JsonProperty("prop2.prop.2") final String prop2
)
{
this.prop1 = prop1;
this.prop1List = prop1List == null ? ImmutableList.<String>of() : prop1List;
this.prop2 = prop2;
}


Expand All @@ -172,6 +192,12 @@ public String getProp1()
return prop1;
}

@JsonProperty
public String getProp2()
{
return prop2;
}

@Override
public boolean equals(Object o)
{
Expand Down
Loading