Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ val avroVersion = "1.7.4"
val bijectionVersion = "0.8.1"
val cascadingAvroVersion = "2.1.2"
val chillVersion = "0.7.1"
val elephantbirdVersion = "4.8"
val elephantbirdVersion = "4.14-RC2"
val hadoopLzoVersion = "0.4.19"
val hadoopVersion = "2.5.0"
val hbaseVersion = "0.94.10"
Expand Down Expand Up @@ -222,7 +222,9 @@ lazy val scalding = Project(
scaldingCommons,
scaldingAvro,
scaldingParquet,
scaldingParquetCascading,
scaldingParquetScrooge,
scaldingParquetScroogeCascading,
scaldingHRaven,
scaldingRepl,
scaldingJson,
Expand Down Expand Up @@ -250,7 +252,9 @@ lazy val scaldingAssembly = Project(
scaldingCommons,
scaldingAvro,
scaldingParquet,
scaldingParquetCascading,
scaldingParquetScrooge,
scaldingParquetScroogeCascading,
scaldingHRaven,
scaldingRepl,
scaldingJson,
Expand Down Expand Up @@ -292,11 +296,13 @@ lazy val scaldingArgs = module("args")
lazy val scaldingDate = module("date")

lazy val cascadingVersion =
System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "2.6.1")
System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "3.1.0-wip-52")

// This is a temporary placeholder while we migrate to cascading3, a few subprojects at a time
// and should eventually be folded into cascadingVersion when we merge to develop.
val cascadingThreeVersion = "3.0.3"
lazy val elephantbirdCascadingArtifact = cascadingVersion.split('.').head match {
case "2" => "elephant-bird-cascading2"
case "3" => "elephant-bird-cascading3"
case other => sys.error(s"Unsupported cascading major version: $other")
}

lazy val cascadingJDBCVersion =
System.getenv.asScala.getOrElse("SCALDING_CASCADING_JDBC_VERSION", "2.6.0")
Expand Down Expand Up @@ -340,7 +346,7 @@ lazy val scaldingCommons = module("commons").settings(
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "chill" % chillVersion,
"com.twitter.elephantbird" % "elephant-bird-cascading2" % elephantbirdVersion,
"com.twitter.elephantbird" % elephantbirdCascadingArtifact % elephantbirdVersion,
"com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion,
"com.hadoop.gplcompression" % "hadoop-lzo" % hadoopLzoVersion,
// TODO: split this out into scalding-thrift
Expand Down Expand Up @@ -401,8 +407,8 @@ lazy val scaldingParquetCascading = module("parquet-cascading").settings(
exclude("com.twitter.elephantbird", "elephant-bird-core"),
"org.apache.thrift" % "libthrift" % thriftVersion,
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
"cascading" % "cascading-core" % cascadingThreeVersion % "provided",
"cascading" % "cascading-hadoop" % cascadingThreeVersion % "provided",
"cascading" % "cascading-core" % cascadingVersion % "provided",
"cascading" % "cascading-hadoop" % cascadingVersion % "provided",
"com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion % "test"
)
).dependsOn(scaldingParquetFixtures % "test->test")
Expand Down Expand Up @@ -451,8 +457,8 @@ lazy val scaldingParquetScroogeCascading = module("parquet-scrooge-cascading")
.settings(
libraryDependencies ++= Seq(
// see https://issues.apache.org/jira/browse/PARQUET-143 for exclusions
"cascading" % "cascading-core" % cascadingThreeVersion % "provided",
"cascading" % "cascading-hadoop" % cascadingThreeVersion % "test",
"cascading" % "cascading-core" % cascadingVersion % "provided",
"cascading" % "cascading-hadoop" % cascadingVersion % "test",
"org.apache.parquet" % "parquet-thrift" % parquetVersion % "test" classifier "tests"
exclude("org.apache.parquet", "parquet-pig")
exclude("com.twitter.elephantbird", "elephant-bird-pig")
Expand Down Expand Up @@ -602,7 +608,7 @@ lazy val maple = Project(
libraryDependencies <++= (scalaVersion) { scalaVersion => Seq(
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
"org.apache.hbase" % "hbase" % hbaseVersion % "provided",
"cascading" % "cascading-hadoop" % cascadingThreeVersion % "provided"
"cascading" % "cascading-hadoop" % cascadingVersion % "provided"
)
}
)
Expand All @@ -628,6 +634,7 @@ lazy val scaldingDb = module("db").settings(
libraryDependencies <++= (scalaVersion) { scalaVersion => Seq(
"org.scala-lang" % "scala-library" % scalaVersion,
"org.scala-lang" % "scala-reflect" % scalaVersion,
"cascading" % "cascading-core" % cascadingVersion,
"com.twitter" %% "bijection-macros" % bijectionVersion
) ++ (if(isScala210x(scalaVersion)) Seq("org.scalamacros" %% "quasiquotes" % "2.0.1") else Seq())
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,22 @@
import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;

import cascading.flow.FlowProcess;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.scheme.hadoop.WritableSequenceFile;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;

/**
*
* Used in conjunction with VersionedKeyValSource.
*/
public class KeyValueByteScheme extends WritableSequenceFile {
public KeyValueByteScheme(Fields fields) {
Expand All @@ -29,7 +30,15 @@ public static byte[] getBytes(BytesWritable key) {
}

@Override
public boolean source(FlowProcess<JobConf> flowProcess,
public void sourceConfInit(FlowProcess<? extends Configuration> flowProcess,
Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf) {
super.sourceConfInit(flowProcess, tap, conf);
conf.setClass("mapred.input.format.class", VersionedSequenceFileInputFormat.class,
org.apache.hadoop.mapred.InputFormat.class);
}

@Override
public boolean source(FlowProcess<? extends Configuration> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) throws IOException {
BytesWritable key = (BytesWritable) sourceCall.getContext()[0];
BytesWritable value = (BytesWritable) sourceCall.getContext()[1];
Expand All @@ -47,7 +56,7 @@ public boolean source(FlowProcess<JobConf> flowProcess,
}

@Override
public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Void, OutputCollector> sinkCall)
public void sink(FlowProcess<? extends Configuration> flowProcess, SinkCall<Void, OutputCollector> sinkCall)
throws IOException {
TupleEntry tupleEntry = sinkCall.getOutgoingEntry();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.twitter.scalding.commons.scheme;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.io.IOException;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileRecordReader;

/**
* Hadoop's SequenceFileInputFormat assumes separate "data" and index" files per directory.
* This does not apply to VersionedKeyValSource, so we bypass that behavior.
*/
public class VersionedSequenceFileInputFormat<K, V> extends FileInputFormat<K, V> {

public VersionedSequenceFileInputFormat() {
setMinSplitSize(SequenceFile.SYNC_INTERVAL);
}

private final PathFilter hiddenPathFilter = new PathFilter() {
// avoid hidden files and directories.
@Override
public boolean accept(Path path) {
String name = path.getName();
return !name.startsWith(".") && !name.startsWith("_");
}
};

@Override
protected FileStatus[] listStatus(JobConf job) throws IOException {
// we pick all the parent directories (should be only one for the picked version)
// and fetch the part files (non-hidden) under them
// any files in the parent list are version files which are to be disregarded
FileStatus[] parentPaths = super.listStatus(job);
List<FileStatus> result = new ArrayList<FileStatus>();
for (int i = 0; i < parentPaths.length; i++) {
FileStatus status = parentPaths[i];
if (status.isDirectory()) {
// add all files under this dir
FileSystem fs = status.getPath().getFileSystem(job);
result.addAll(Arrays.asList(fs.listStatus(status.getPath(), hiddenPathFilter)));
}
}
return result.toArray(new FileStatus[0]);
}

public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(split.toString());
return new SequenceFileRecordReader<K, V>(job, (FileSplit) split);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.twitter.scalding.commons.datastores.VersionedStore;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
Expand All @@ -13,6 +14,7 @@
import org.apache.hadoop.mapred.RecordReader;

import cascading.flow.FlowProcess;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.scheme.Scheme;
import cascading.tap.hadoop.Hfs;

Expand All @@ -30,7 +32,7 @@ public static enum TapMode {SOURCE, SINK}
// sink-specific
private String newVersionPath;

public VersionedTap(String dir, Scheme<JobConf,RecordReader,OutputCollector,?,?> scheme, TapMode mode)
public VersionedTap(String dir, Scheme<Configuration,RecordReader,OutputCollector,?,?> scheme, TapMode mode)
throws IOException {
super(scheme, dir);
this.mode = mode;
Expand Down Expand Up @@ -59,11 +61,11 @@ public String getOutputDirectory() {
return getPath().toString();
}

public VersionedStore getStore(JobConf conf) throws IOException {
public VersionedStore getStore(Configuration conf) throws IOException {
return new VersionedStore(FileSystem.get(conf), getOutputDirectory());
}

public String getSourcePath(JobConf conf) {
public String getSourcePath(Configuration conf) {
VersionedStore store;
try {
store = getStore(conf);
Expand All @@ -77,7 +79,7 @@ public String getSourcePath(JobConf conf) {
}
}

public String getSinkPath(JobConf conf) {
public String getSinkPath(Configuration conf) {
try {
VersionedStore store = getStore(conf);
String sinkPath = (version == null) ? store.createVersion() : store.createVersion(version);
Expand All @@ -91,33 +93,36 @@ public String getSinkPath(JobConf conf) {
}

@Override
public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
public void sourceConfInit(FlowProcess<? extends Configuration> process, Configuration conf) {
super.sourceConfInit(process, conf);
FileInputFormat.setInputPaths(conf, getSourcePath(conf));
conf.unset("mapred.input.dir"); // need this to unset any paths set in super.sourceConfInit
Path fullyQualifiedPath = getFileSystem(conf).makeQualified(new Path(getSourcePath(conf)));
HadoopUtil.addInputPath(conf, fullyQualifiedPath);
}

@Override
public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
public void sinkConfInit(FlowProcess<? extends Configuration> process, Configuration conf) {
super.sinkConfInit(process, conf);

if (newVersionPath == null)
newVersionPath = getSinkPath(conf);

FileOutputFormat.setOutputPath(conf, new Path(newVersionPath));
Path fullyQualifiedPath = getFileSystem(conf).makeQualified(new Path(newVersionPath));
HadoopUtil.setOutputPath(conf, fullyQualifiedPath);
}

@Override
public boolean resourceExists(JobConf jc) throws IOException {
public boolean resourceExists(Configuration jc) throws IOException {
return getStore(jc).mostRecentVersion() != null;
}

@Override
public boolean createResource(JobConf jc) throws IOException {
public boolean createResource(Configuration jc) throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}

@Override
public boolean deleteResource(JobConf jc) throws IOException {
public boolean deleteResource(Configuration jc) throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}

Expand All @@ -131,13 +136,13 @@ public String getIdentifier() {
}

@Override
public long getModifiedTime(JobConf conf) throws IOException {
public long getModifiedTime(Configuration conf) throws IOException {
VersionedStore store = getStore(conf);
return (mode == TapMode.SINK) ? 0 : store.mostRecentVersion();
}

@Override
public boolean commitResource(JobConf conf) throws IOException {
public boolean commitResource(Configuration conf) throws IOException {
VersionedStore store = new VersionedStore(FileSystem.get(conf), getOutputDirectory());

if (newVersionPath != null) {
Expand All @@ -150,7 +155,7 @@ public boolean commitResource(JobConf conf) throws IOException {
return true;
}

private static void markSuccessfulOutputDir(Path path, JobConf conf) throws IOException {
private static void markSuccessfulOutputDir(Path path, Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
// create a file in the folder to mark it
if (fs.exists(path)) {
Expand All @@ -160,7 +165,7 @@ private static void markSuccessfulOutputDir(Path path, JobConf conf) throws IOEx
}

@Override
public boolean rollbackResource(JobConf conf) throws IOException {
public boolean rollbackResource(Configuration conf) throws IOException {
if (newVersionPath != null) {
getStore(conf).failVersion(newVersionPath);
newVersionPath = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.reflect.ClassTag

import com.twitter.bijection._
import com.twitter.chill.Externalizer
import com.twitter.elephantbird.cascading2.scheme.LzoBinaryScheme
import com.twitter.elephantbird.cascading3.scheme.LzoBinaryScheme
import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat
import com.twitter.elephantbird.mapreduce.io.{ BinaryConverter, GenericWritable }
import com.twitter.elephantbird.mapreduce.input.{ BinaryConverterProvider, MultiInputFormat }
Expand Down Expand Up @@ -97,7 +97,7 @@ object LzoGenericScheme {
/**
* From a Binary Converter passed in configure in the JobConf using of that by ElephantBird
*/
def setConverter[M](conv: BinaryConverter[M], conf: JobConf, confKey: String, overrideConf: Boolean = false): Unit = {
def setConverter[M](conv: BinaryConverter[M], conf: Configuration, confKey: String, overrideConf: Boolean = false): Unit = {
if ((conf.get(confKey) == null) || overrideConf) {
val extern = Externalizer(conv)
try {
Expand All @@ -120,9 +120,9 @@ class LzoGenericScheme[M](@transient conv: BinaryConverter[M], clazz: Class[M])
override protected def prepareBinaryWritable(): GenericWritable[M] =
new GenericWritable(conv)

override def sourceConfInit(fp: FlowProcess[JobConf],
tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]],
conf: JobConf): Unit = {
override def sourceConfInit(fp: FlowProcess[_ <: Configuration],
tap: Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]],
conf: Configuration): Unit = {

LzoGenericScheme.setConverter(conv, conf, SourceConfigBinaryConverterProvider.ProviderConfKey)
MultiInputFormat.setClassConf(clazz, conf)
Expand All @@ -131,9 +131,9 @@ class LzoGenericScheme[M](@transient conv: BinaryConverter[M], clazz: Class[M])
DelegateCombineFileInputFormat.setDelegateInputFormat(conf, classOf[MultiInputFormat[_]])
}

override def sinkConfInit(fp: FlowProcess[JobConf],
tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]],
conf: JobConf): Unit = {
override def sinkConfInit(fp: FlowProcess[_ <: Configuration],
tap: Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]],
conf: Configuration): Unit = {
LzoGenericScheme.setConverter(conv, conf, SinkConfigBinaryConverterProvider.ProviderConfKey)
LzoGenericBlockOutputFormat.setClassConf(clazz, conf)
LzoGenericBlockOutputFormat.setGenericConverterClassConf(classOf[SinkConfigBinaryConverterProvider[_]], conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cascading.scheme.Scheme
import org.apache.thrift.TBase
import com.google.protobuf.Message
import com.twitter.bijection.Injection
import com.twitter.elephantbird.cascading2.scheme._
import com.twitter.elephantbird.cascading3.scheme._
import com.twitter.scalding._
import com.twitter.scalding.Dsl._
import com.twitter.scalding.source.{ CheckedInversion, MaxFailuresCheck }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.twitter.scalding.commons.source
import cascading.scheme.Scheme
import cascading.scheme.hadoop.{ TextDelimited => CHTextDelimited }
import cascading.scheme.local.{ TextDelimited => CLTextDelimited }
import com.twitter.elephantbird.cascading2.scheme.LzoTextDelimited
import com.twitter.elephantbird.cascading3.scheme.LzoTextDelimited
import com.twitter.scalding._
import com.twitter.scalding.source.TypedTextDelimited
import com.twitter.scalding.source.TypedSep
Expand Down
Loading