Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdks/java/testing/tpcds/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ configurations {
}

dependencies {
compile library.java.avro
compile library.java.vendored_guava_26_0_jre
compile library.java.vendored_calcite_1_20_0
compile library.java.commons_csv
Expand All @@ -62,6 +63,7 @@ dependencies {
compile "com.alibaba:fastjson:1.2.69"
compile project(":sdks:java:extensions:sql")
compile project(":sdks:java:extensions:sql:zetasql")
compile project(":sdks:java:io:parquet")
compile project(path: ":runners:google-cloud-dataflow-java")
compile project(path: ":sdks:java:core", configuration: "shadow")
testRuntimeClasspath library.java.slf4j_jdk14
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.tpcds;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -25,12 +26,12 @@
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTable;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
Expand All @@ -40,6 +41,8 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
import org.apache.commons.csv.CSVFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -87,27 +90,77 @@ private static PCollectionTuple getTables(

// Only when queryString contains tableName, the table is relevant to this query and will be
// added. This can avoid reading unnecessary data files.
// TODO: Simple but not reliable way since table name can be any substring in a query and can
// give false positives
if (queryString.contains(tableName)) {
// This is location path where the data are stored
String filePattern =
tpcdsOptions.getDataDirectory() + "/" + dataSize + "/" + tableName + ".dat";

PCollection<Row> table =
new TextTable(
tableSchema.getValue(),
filePattern,
new CsvToRow(tableSchema.getValue(), csvFormat),
new RowToCsv(csvFormat))
.buildIOReader(pipeline.begin())
.setCoder(SchemaCoder.of(tableSchema.getValue()))
.setName(tableSchema.getKey());

tables = tables.and(new TupleTag<>(tableName), table);
switch (tpcdsOptions.getSourceType()) {
case CSV:
{
PCollection<Row> table =
getTableCSV(pipeline, csvFormat, tpcdsOptions, dataSize, tableSchema, tableName);
tables = tables.and(new TupleTag<>(tableName), table);
break;
}
case PARQUET:
{
PCollection<GenericRecord> table =
getTableParquet(pipeline, tpcdsOptions, dataSize, tableName);
tables = tables.and(new TupleTag<>(tableName), table);
break;
}
default:
throw new IllegalStateException(
"Unexpected source type: " + tpcdsOptions.getSourceType());
}
}
}
return tables;
}

private static PCollection<GenericRecord> getTableParquet(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose using Beam SQL DDL is a future goal so this and the CSV one won't be needed for the SQL case (but for the hand coded implementations).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be adjusted in the future.

Pipeline pipeline, TpcdsOptions tpcdsOptions, String dataSize, String tableName)
throws IOException {
org.apache.avro.Schema schema = getAvroSchema(tableName);

String filepattern =
tpcdsOptions.getDataDirectory() + "/" + dataSize + "/" + tableName + "/*.parquet";

return pipeline.apply(
"Read " + tableName + " (parquet)",
ParquetIO.read(schema)
.from(filepattern)
.withSplit()
// TODO: add .withProjection()
.withBeamSchemas(true));
}

private static PCollection<Row> getTableCSV(
Pipeline pipeline,
CSVFormat csvFormat,
TpcdsOptions tpcdsOptions,
String dataSize,
Map.Entry<String, Schema> tableSchema,
String tableName) {
// This is location path where the data are stored
String filePattern =
tpcdsOptions.getDataDirectory() + "/" + dataSize + "/" + tableName + ".dat";

return new TextTable(
tableSchema.getValue(),
filePattern,
new CsvToRow(tableSchema.getValue(), csvFormat),
new RowToCsv(csvFormat))
.buildIOReader(pipeline.begin())
.setCoder(SchemaCoder.of(tableSchema.getValue()))
.setName(tableSchema.getKey());
}

private static org.apache.avro.Schema getAvroSchema(String tableName) throws IOException {
String path = "schemas_avro/" + tableName + ".json";
return new org.apache.avro.Schema.Parser()
.parse(Resources.toString(Resources.getResource(path), Charsets.UTF_8));
}

/**
* Print the summary table after all jobs are finished.
*
Expand Down Expand Up @@ -160,28 +213,18 @@ public static void runUsingSqlTransform(String[] args) throws Exception {
Pipeline[] pipelines = new Pipeline[queryNames.length];
CSVFormat csvFormat = CSVFormat.MYSQL.withDelimiter('|').withNullString("");

// Execute all queries, transform the each result into a PCollection<String>, write them into
// Execute all queries, transform each result into a PCollection<String>, write them into
// the txt file and store in a GCP directory.
for (int i = 0; i < queryNames.length; i++) {
// For each query, get a copy of pipelineOptions from command line arguments.
TpcdsOptions tpcdsOptionsCopy =
PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);

// Cast tpcdsOptions as a BeamSqlPipelineOptions object to read and set queryPlanner (the
// default one is Calcite, can change to ZetaSQL).
BeamSqlPipelineOptions beamSqlPipelineOptionsCopy =
tpcdsOptionsCopy.as(BeamSqlPipelineOptions.class);

// Finally, cast BeamSqlPipelineOptions as a DataflowPipelineOptions object to read and set
// other required pipeline optionsparameters .
DataflowPipelineOptions dataflowPipelineOptionsCopy =
beamSqlPipelineOptionsCopy.as(DataflowPipelineOptions.class);

// Set a unique job name using the time stamp so that multiple different pipelines can run
// together.
dataflowPipelineOptionsCopy.setJobName(queryNames[i] + "result" + System.currentTimeMillis());
tpcdsOptionsCopy.setJobName(queryNames[i] + "result" + System.currentTimeMillis());

pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy);
pipelines[i] = Pipeline.create(tpcdsOptionsCopy);
String queryString = QueryReader.readQuery(queryNames[i]);
PCollectionTuple tables = getTables(pipelines[i], csvFormat, queryNames[i]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
*/
package org.apache.beam.sdk.tpcds;

import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;

/** Options used to configure TPC-DS test. */
public interface TpcdsOptions extends PipelineOptions {
public interface TpcdsOptions extends BeamSqlPipelineOptions {
@Description(
"The size of TPC-DS data to run query on, user input should contain the unit, such as '1G', '10G'")
@Validation.Required
Expand Down Expand Up @@ -55,4 +55,10 @@ public interface TpcdsOptions extends PipelineOptions {
String getResultsDirectory();

void setResultsDirectory(String path);

@Description("Where the data comes from.")
@Default.Enum("CSV")
TpcdsUtils.SourceType getSourceType();

void setSourceType(TpcdsUtils.SourceType sourceType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.tpcds;

/** Odd's 'n Ends used throughout queries and driver. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Odd's 'n Ends?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a synonym of miscellaneous articles

public class TpcdsUtils {

/** Possible sources for events. */
public enum SourceType {
/** Read events from CSV files. */
CSV,
/** Read events from Parquet files. */
PARQUET
}
}
Loading