Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.impl;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Ints;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.utils.CollectionUtils;

import javax.annotation.Nullable;
import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

public abstract class CloudObjectInputSource<T extends InputEntity> extends AbstractInputSource
implements SplittableInputSource<CloudObjectLocation>
{
private final List<URI> uris;
private final List<URI> prefixes;
private final List<CloudObjectLocation> objects;

public CloudObjectInputSource(
String scheme,
@Nullable List<URI> uris,
@Nullable List<URI> prefixes,
@Nullable List<CloudObjectLocation> objects
)
{
this.uris = uris;
this.prefixes = prefixes;
this.objects = objects;

if (!CollectionUtils.isNullOrEmpty(objects)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) || !CollectionUtils.isNullOrEmpty(prefixes));
} else if (!CollectionUtils.isNullOrEmpty(uris)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes));
uris.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri));
} else if (!CollectionUtils.isNullOrEmpty(prefixes)) {
prefixes.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri));
} else {
throwIfIllegalArgs(true);
}
}

@JsonProperty
public List<URI> getUris()
{
return uris;
}

@JsonProperty
public List<URI> getPrefixes()
{
return prefixes;
}

@Nullable
@JsonProperty
public List<CloudObjectLocation> getObjects()
{
return objects;
}

/**
* Create the correct {@link InputEntity} for this input source given a split on a {@link CloudObjectLocation}. This
* is called internally by {@link #formattableReader} and operates on the output of {@link #createSplits}.
*/
protected abstract T createEntity(InputSplit<CloudObjectLocation> split);

/**
* Create a stream of {@link CloudObjectLocation} splits by listing objects that appear under {@link #prefixes} using
* this input sources backend API. This is called internally by {@link #createSplits} and {@link #estimateNumSplits},
* only if {@link #prefixes} is set, otherwise the splits are created directly from {@link #uris} or {@link #objects}.
* Calling if {@link #prefixes} is not set is likely to either lead to an empty iterator or null pointer exception.
*/
protected abstract Stream<InputSplit<CloudObjectLocation>> getPrefixesSplitStream();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add javadocs for the new protected methods?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if prefixes aren't specified for the input source?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated javadoc to mention that this method is called internally by createSplits and estimateNumSplits


@Override
public Stream<InputSplit<CloudObjectLocation>> createSplits(
InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec
)
{
if (!CollectionUtils.isNullOrEmpty(objects)) {
return objects.stream().map(InputSplit::new);
}
if (!CollectionUtils.isNullOrEmpty(uris)) {
return uris.stream().map(CloudObjectLocation::new).map(InputSplit::new);
}

return getPrefixesSplitStream();
}

@Override
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
if (!CollectionUtils.isNullOrEmpty(objects)) {
return objects.size();
}

if (!CollectionUtils.isNullOrEmpty(uris)) {
return uris.size();
}

return Ints.checkedCast(getPrefixesSplitStream().count());
}

@Override
public boolean needsFormat()
{
return true;
}

@Override
protected InputSourceReader formattableReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
@Nullable File temporaryDirectory
)
{
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
createSplits(inputFormat, null).map(this::createEntity),
temporaryDirectory
);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CloudObjectInputSource that = (CloudObjectInputSource) o;
return Objects.equals(uris, that.uris) &&
Objects.equals(prefixes, that.prefixes) &&
Objects.equals(objects, that.objects);
}

@Override
public int hashCode()
{
return Objects.hash(uris, prefixes, objects);
}

private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException
{
if (clause) {
throw new IllegalArgumentException("exactly one of either uris or prefixes or objects must be specified");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;

import java.net.URI;
Expand All @@ -45,6 +46,14 @@
*/
public class CloudObjectLocation
{
public static URI validateUriScheme(String scheme, URI uri)
{
if (!scheme.equalsIgnoreCase(uri.getScheme())) {
throw new IAE("Invalid URI scheme [%s] must be [%s]", uri.toString(), scheme);
}
return uri;
}

private final String bucket;
private final String path;

Expand Down Expand Up @@ -125,5 +134,4 @@ public int hashCode()
{
return Objects.hash(bucket, path);
}

}
47 changes: 46 additions & 1 deletion docs/development/extensions-core/google.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,56 @@ This extension also provides an input source for Druid native batch ingestion to
...
```

```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "google",
"prefixes": ["gs://foo/bar", "gs://bar/foo"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```


```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "google",
"objects": [
{ "bucket": "foo", "path": "bar/file1.json"},
{ "bucket": "bar", "path": "foo/file2.json"}
]
},
"inputFormat": {
"type": "json"
},
...
},
...
```

|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `google`.|N/A|yes|
|uris|JSON array of URIs where Google Cloud Storage files to be ingested are located.|N/A|yes|
|uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|N/A|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of Google Cloud Storage objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set|


Google Cloud Storage object:

|property|description|default|required?|
|--------|-----------|-------|---------|
|bucket|Name of the Google Cloud Storage bucket|N/A|yes|
|path|The path where data is located.|N/A|yes|

## Firehose

Expand Down
13 changes: 13 additions & 0 deletions extensions-core/google-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
Expand Down Expand Up @@ -158,5 +165,11 @@
<artifactId>jackson-module-guice</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Predicate;
import org.apache.druid.data.input.RetryingInputEntity;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.google.GoogleByteSource;
import org.apache.druid.storage.google.GoogleStorage;
Expand All @@ -33,36 +34,33 @@

public class GoogleCloudStorageEntity extends RetryingInputEntity
{
private final GoogleStorage storage;
private final URI uri;
private final CloudObjectLocation location;
private final GoogleByteSource byteSource;

GoogleCloudStorageEntity(GoogleStorage storage, URI uri)
GoogleCloudStorageEntity(GoogleStorage storage, CloudObjectLocation location)
{
this.storage = storage;
this.uri = uri;
this.location = location;
this.byteSource = new GoogleByteSource(storage, location.getBucket(), location.getPath());
}

@Nullable
@Override
public URI getUri()
{
return uri;
return location.toUri(GoogleCloudStorageInputSource.SCHEME);
}

@Override
protected InputStream readFrom(long offset) throws IOException
{
// Get data of the given object and open an input stream
final String bucket = uri.getAuthority();
final String key = StringUtils.maybeRemoveLeadingSlash(uri.getPath());
final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, key);
return byteSource.openStream(offset);
}

@Override
protected String getPath()
{
return StringUtils.maybeRemoveLeadingSlash(uri.getPath());
return StringUtils.maybeRemoveLeadingSlash(byteSource.getPath());
}

@Override
Expand Down
Loading