Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@
</repositories>

<dependencies>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>

<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
Expand Down
20 changes: 16 additions & 4 deletions src/main/java/com/treasuredata/client/TDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -61,7 +62,6 @@
import com.treasuredata.client.model.TDUser;
import com.treasuredata.client.model.TDUserList;
import com.treasuredata.client.model.impl.TDScheduleRunResult;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -597,10 +597,22 @@ public void updateTableSchema(String databaseName, String tableName, List<TDColu
for (TDColumn newColumn : newSchema) {
builder.add(ImmutableList.of(newColumn.getKeyString(), newColumn.getType().toString(), newColumn.getName()));
}
String schemaJson = JSONObject.toJSONString(ImmutableMap.of("schema", builder.build(), "ignore_duplicate_schema", ignoreDuplicate));
String schemaJson = toJSONString(ImmutableMap.of("schema", builder.build(), "ignore_duplicate_schema", ignoreDuplicate));
doPost(buildUrl("/v3/table/update-schema", databaseName, tableName), ImmutableMap.<String, String>of(), Optional.of(schemaJson), String.class);
}

private static final ObjectMapper objectMapper = new ObjectMapper();

private static String toJSONString(Map<String, Object> map)
{
try {
return objectMapper.writeValueAsString(map);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

@Override
public void appendTableSchema(String databaseName, String tableName, List<TDColumn> appendedSchema)
{
Expand All @@ -614,7 +626,7 @@ public void appendTableSchema(String databaseName, String tableName, List<TDColu
// So we should not pass `appendedColumn.getName()` here.
builder.add(ImmutableList.of(appendedColumn.getKeyString(), appendedColumn.getType().toString()));
}
String schemaJson = JSONObject.toJSONString(ImmutableMap.of("schema", builder.build()));
String schemaJson = toJSONString(ImmutableMap.of("schema", builder.build()));
doPost(buildUrl("/v3/table/append-schema", databaseName, tableName), ImmutableMap.<String, String>of(), Optional.of(schemaJson), String.class);
}

Expand Down Expand Up @@ -794,7 +806,7 @@ public void performBulkImportSession(String sessionName, Optional<String> poolNa
{
Optional<String> jsonBody = Optional.empty();
if (poolName.isPresent()) {
jsonBody = Optional.of(JSONObject.toJSONString(ImmutableMap.of("pool_name", poolName.get())));
jsonBody = Optional.of(toJSONString(ImmutableMap.of("pool_name", poolName.get())));
}
doPost(buildUrl("/v3/bulk_import/perform", sessionName), ImmutableMap.of("priority", Integer.toString(priority.toInt())), jsonBody, String.class);
}
Expand Down
30 changes: 8 additions & 22 deletions src/main/java/com/treasuredata/client/model/TDColumn.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.RuntimeJsonMappingException;
import org.json.simple.JSONArray;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
Expand Down Expand Up @@ -92,34 +91,21 @@ public String getKeyString()
return new String(key, StandardCharsets.UTF_8);
}

private static JSONArray castToArray(Object obj)
{
if (obj instanceof JSONArray) {
return (JSONArray) obj;
}
else {
throw new RuntimeJsonMappingException("Not an json array: " + obj);
}
}
private static final ObjectMapper objectMapper = new ObjectMapper();

public static List<TDColumn> parseTuple(String jsonStr)
{
// unescape json quotation
try {
String unescaped = jsonStr.replaceAll("\\\"", "\"");
JSONArray arr = castToArray(new JSONParser().parse(unescaped));
List<TDColumn> columnList = new ArrayList<>(arr.size());
for (Object e : arr) {
JSONArray columnNameAndType = castToArray(e);
String[] s = new String[columnNameAndType.size()];
for (int i = 0; i < columnNameAndType.size(); ++i) {
s[i] = columnNameAndType.get(i).toString();
}
columnList.add(parseTuple(s));
String[][] arr = objectMapper.readValue(unescaped, String[][].class);
List<TDColumn> columnList = new ArrayList<>(arr.length);
for (String[] columnNameAndType : arr) {
columnList.add(parseTuple(columnNameAndType));
}
return columnList;
}
catch (ParseException e) {
catch (JsonProcessingException e) {
LoggerFactory.getLogger(TDColumn.class).error("Failed to parse json string", e);
return new ArrayList<>(0);
}
Expand Down
89 changes: 38 additions & 51 deletions src/test/java/com/treasuredata/client/TestTDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package com.treasuredata.client;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Function;
Expand Down Expand Up @@ -59,9 +61,6 @@
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -96,6 +95,7 @@
import java.util.GregorianCalendar;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -124,6 +124,8 @@
*/
public class TestTDClient
{
private static final ObjectMapper objectMapper = new ObjectMapper();

private static final Logger logger = LoggerFactory.getLogger(TestTDClient.class);

private static final String SAMPLE_DB = "_tdclient_test";
Expand Down Expand Up @@ -192,12 +194,12 @@ public void dbNameValidation()

@Test
public void serverStatus()
throws JSONException
throws JsonProcessingException
{
String status = client.serverStatus();
logger.info(status);
JSONObject s = new JSONObject(status);
assertEquals("ok", s.getString("status"));
Map<String, String> s = objectMapper.readValue(status, Map.class);
assertEquals("ok", s.get("status"));
}

@Test
Expand Down Expand Up @@ -373,24 +375,19 @@ public void submitJob()
assertTrue(schema.isPresent());
assertEquals("[[\"cnt\", \"bigint\"]]", schema.get());

JSONArray array = client.jobResult(jobId, TDResultFormat.JSON, new Function<InputStream, JSONArray>()
{
@Override
public JSONArray apply(InputStream input)
{
try (BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))) {
String result = reader.lines().collect(Collectors.joining());
logger.info("result:\n" + result);
return new JSONArray(result);
}
catch (Exception e) {
throw new RuntimeException(e);
}
String[] array = client.jobResult(jobId, TDResultFormat.JSON, input -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))) {
String result = reader.lines().collect(Collectors.joining());
logger.info("result:\n" + result);
return objectMapper.readValue(result, String[].class);
}
catch (Exception e) {
throw new RuntimeException(e);
}
});
assertEquals(1, array.length());
assertEquals(1, array.length);
assertEquals(1, jobInfo.getNumRecords());
assertTrue(array.getLong(0) > 0);
assertTrue(Long.parseLong(array[0]) > 0);

// test msgpack.gz format
client.jobResult(jobId, TDResultFormat.MESSAGE_PACK_GZ, new Function<InputStream, Object>()
Expand Down Expand Up @@ -497,25 +494,20 @@ private void submitJobWithEngineVersion(TDJob.Type type, Optional<TDJob.EngineVe
assertTrue(schema.isPresent());
assertEquals("[[\"cnt\", \"bigint\"]]", schema.get());

JSONArray array = client.jobResult(jobId, TDResultFormat.JSON, new Function<InputStream, JSONArray>()
{
@Override
public JSONArray apply(InputStream input)
{
try (BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))) {
String result = reader.lines().collect(Collectors.joining());
logger.info("result:\n" + result);
return new JSONArray(result);
}
catch (Exception e) {
throw new RuntimeException(e);
}
String[] array = client.jobResult(jobId, TDResultFormat.JSON, input -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))) {
String result = reader.lines().collect(Collectors.joining());
logger.info("result:\n" + result);
return objectMapper.readValue(result, String[].class);
}
catch (Exception e) {
throw new RuntimeException(e);
}
});

assertEquals(1, array.length());
assertEquals(1, array.length);
assertEquals(1, jobInfo.getNumRecords());
assertTrue(array.getLong(0) > 0);
assertTrue(Long.parseLong(array[0]) > 0);
}

@Test
Expand Down Expand Up @@ -582,24 +574,19 @@ public void submitJobWithScheduledTime()
String jobId = client.submit(request);
waitJobCompletion(jobId);

JSONArray array = client.jobResult(jobId, TDResultFormat.JSON, new Function<InputStream, JSONArray>()
{
@Override
public JSONArray apply(InputStream input)
{
try (BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))) {
String result = reader.lines().collect(Collectors.joining());
logger.info("result:\n" + result);
return new JSONArray(result);
}
catch (Exception e) {
throw new RuntimeException(e);
}
String[] array = client.jobResult(jobId, TDResultFormat.JSON, input -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))) {
String result = reader.lines().collect(Collectors.joining());
logger.info("result:\n" + result);
return objectMapper.readValue(result, String[].class);
}
catch (Exception e) {
throw new RuntimeException(e);
}
});

assertEquals(1, array.length());
assertEquals(scheduledTime, array.getLong(0));
assertEquals(1, array.length);
assertEquals(scheduledTime, Long.parseLong(array[0]));
}

@Test
Expand Down