Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion extensions-contrib/gce-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-compute</artifactId>
<version>v1-rev214-1.25.0</version>
<version>${com.google.apis.compute.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
4 changes: 0 additions & 4 deletions extensions-core/google-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
<relativePath>../../pom.xml</relativePath>
</parent>

<properties>
<com.google.apis.storage.version>v1-rev158-${com.google.apis.client.version}</com.google.apis.storage.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,9 @@ public InputStream get(final String bucket, final String path) throws IOExceptio
public InputStream get(final String bucket, final String path, long start) throws IOException
{
final Get get = storage.objects().get(bucket, path);
if (start > 0) {
get.getMediaHttpDownloader().setBytesDownloaded(start);
}
get.getMediaHttpDownloader().setDirectDownloadEnabled(false);
return get.executeMediaAsInputStream();
InputStream inputStream = get.executeMediaAsInputStream();
inputStream.skip(start);
return inputStream;
}

public void delete(final String bucket, final String path) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,7 @@ public InputStream openStream() throws IOException
start = 0;
}

InputStream stream = new GoogleByteSource(storage, config.getBucket(), taskKey).openStream();
stream.skip(start);

return stream;
return new GoogleByteSource(storage, config.getBucket(), taskKey).openStream(start);
}
catch (Exception e) {
throw new IOException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.storage.google;

import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential;
import com.google.api.client.http.ByteArrayContent;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.client.testing.http.MockHttpTransport;
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
import com.google.api.client.testing.http.MockLowLevelHttpResponse;
import com.google.api.services.storage.Storage;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.io.InputStream;

public class GoogleStorageTest
{
@Test
public void testGet() throws IOException
{
String content = "abcdefghij";
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
response.setContent(content);
GoogleStorage googleStorage = makeGoogleStorage(response);
InputStream is = googleStorage.get("bucket", "path");
String actual = GoogleTestUtils.readAsString(is);
Assert.assertEquals(content, actual);
}

@Test
public void testGetWithOffset() throws IOException
{
String content = "abcdefghij";
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
response.setContent(content);
GoogleStorage googleStorage = makeGoogleStorage(response);
InputStream is = googleStorage.get("bucket", "path", 2);
String actual = GoogleTestUtils.readAsString(is);
Assert.assertEquals(content.substring(2), actual);
}

@Test
public void testInsert() throws IOException
{
String content = "abcdefghij";
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
response.addHeader("Location", "http://random-path");
response.setContent("{}");
MockHttpTransport transport = new MockHttpTransport.Builder().setLowLevelHttpResponse(response).build();
GoogleStorage googleStorage = makeGoogleStorage(transport);
googleStorage.insert("bucket", "path", new ByteArrayContent("text/html", StringUtils.toUtf8(content)));
MockLowLevelHttpRequest request = transport.getLowLevelHttpRequest();
String actual = request.getContentAsString();
Assert.assertEquals(content, actual);
}

private GoogleStorage makeGoogleStorage(MockLowLevelHttpResponse response)
{
MockHttpTransport transport = new MockHttpTransport.Builder().setLowLevelHttpResponse(response).build();
return makeGoogleStorage(transport);
}

private GoogleStorage makeGoogleStorage(MockHttpTransport transport)
{
HttpRequestInitializer initializer = new MockGoogleCredential.Builder().build();
Storage storage = new Storage(transport, JacksonFactory.getDefaultInstance(), initializer);
return new GoogleStorage(storage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testStreamTaskLogWithoutOffset() throws Exception
final String logPath = PREFIX + "/" + TASKID;
EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true);
EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length());
EasyMock.expect(storage.get(BUCKET, logPath)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(testLog)));
EasyMock.expect(storage.get(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(testLog)));

replayAll();

Expand All @@ -134,19 +134,21 @@ public void testStreamTaskLogWithoutOffset() throws Exception
public void testStreamTaskLogWithPositiveOffset() throws Exception
{
final String testLog = "hello this is a log";

final int offset = 5;
final String expectedLog = testLog.substring(offset);
final String logPath = PREFIX + "/" + TASKID;
EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true);
EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length());
EasyMock.expect(storage.get(BUCKET, logPath)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(testLog)));
EasyMock.expect(storage.get(BUCKET, logPath, offset))
.andReturn(new ByteArrayInputStream(StringUtils.toUtf8(expectedLog)));

replayAll();

final Optional<ByteSource> byteSource = googleTaskLogs.streamTaskLog(TASKID, 5);
final Optional<ByteSource> byteSource = googleTaskLogs.streamTaskLog(TASKID, offset);

final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog.substring(5));
Assert.assertEquals(writer.toString(), expectedLog);

verifyAll();
}
Expand All @@ -155,19 +157,22 @@ public void testStreamTaskLogWithPositiveOffset() throws Exception
public void testStreamTaskLogWithNegative() throws Exception
{
final String testLog = "hello this is a log";

final int offset = -3;
final int internalOffset = testLog.length() + offset;
final String expectedLog = testLog.substring(internalOffset);
final String logPath = PREFIX + "/" + TASKID;
EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true);
EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length());
EasyMock.expect(storage.get(BUCKET, logPath)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(testLog)));
EasyMock.expect(storage.get(BUCKET, logPath, internalOffset))
.andReturn(new ByteArrayInputStream(StringUtils.toUtf8(expectedLog)));

replayAll();

final Optional<ByteSource> byteSource = googleTaskLogs.streamTaskLog(TASKID, -3);
final Optional<ByteSource> byteSource = googleTaskLogs.streamTaskLog(TASKID, offset);

final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog.substring(testLog.length() - 3));
Assert.assertEquals(writer.toString(), expectedLog);

verifyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IExpectationSetters;

import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.math.BigInteger;
import java.net.URI;
import java.util.HashMap;
Expand Down Expand Up @@ -119,4 +122,11 @@ public static void expectDeleteObjects(
resultExpectationSetter.andVoid();
}
}

public static String readAsString(InputStream is) throws IOException
{
final StringWriter writer = new StringWriter();
IOUtils.copy(is, writer, "UTF-8");
return writer.toString();
}
}
8 changes: 4 additions & 4 deletions licenses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4100,7 +4100,7 @@ name: Google Cloud Storage JSON API
license_category: binary
module: extensions/druid-google-extensions
license_name: Apache License version 2.0
version: v1-rev158-1.25.0
version: v1-rev20190523-1.26.0
libraries:
- com.google.apis: google-api-services-storage

Expand All @@ -4110,7 +4110,7 @@ name: Google Compute Engine API
license_category: binary
module: extensions/gce-extensions
license_name: Apache License version 2.0
version: v1-rev214-1.25.0
version: v1-rev20190607-1.26.0
libraries:
- com.google.apis: google-api-services-compute

Expand All @@ -4130,7 +4130,7 @@ name: Google APIs Client Library For Java
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 1.25.0
version: 1.26.0
libraries:
- com.google.api-client: google-api-client

Expand All @@ -4140,7 +4140,7 @@ name: Google HTTP Client Library For Java
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 1.25.0
version: 1.26.0
libraries:
- com.google.http-client: google-http-client
- com.google.http-client: google-http-client-jackson2
Expand Down
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@
<!-- When upgrading ZK, edit docs and integration tests as well (integration-tests/docker-base/setup.sh) -->
<zookeeper.version>3.4.14</zookeeper.version>
<checkerframework.version>2.5.7</checkerframework.version>
<com.google.apis.client.version>1.25.0</com.google.apis.client.version>
<com.google.apis.compute.version>v1-rev214-1.25.0</com.google.apis.compute.version>
<com.google.apis.client.version>1.26.0</com.google.apis.client.version>
<com.google.apis.compute.version>v1-rev20190607-${com.google.apis.client.version}</com.google.apis.compute.version>
<com.google.apis.storage.version>v1-rev20190523-${com.google.apis.client.version}</com.google.apis.storage.version>
<repoOrgId>apache.snapshots</repoOrgId>
<repoOrgName>Apache Snapshot Repository</repoOrgName>
<repoOrgUrl>https://repository.apache.org/snapshots</repoOrgUrl>
Expand Down