diff --git a/build.sbt b/build.sbt index 4f66de5eba..e26e31aaa8 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ val avroVersion = "1.7.4" val bijectionVersion = "0.8.1" val cascadingAvroVersion = "2.1.2" val chillVersion = "0.7.1" -val elephantbirdVersion = "4.8" +val elephantbirdVersion = "4.14-RC2" val hadoopLzoVersion = "0.4.19" val hadoopVersion = "2.5.0" val hbaseVersion = "0.94.10" @@ -222,7 +222,9 @@ lazy val scalding = Project( scaldingCommons, scaldingAvro, scaldingParquet, + scaldingParquetCascading, scaldingParquetScrooge, + scaldingParquetScroogeCascading, scaldingHRaven, scaldingRepl, scaldingJson, @@ -250,7 +252,9 @@ lazy val scaldingAssembly = Project( scaldingCommons, scaldingAvro, scaldingParquet, + scaldingParquetCascading, scaldingParquetScrooge, + scaldingParquetScroogeCascading, scaldingHRaven, scaldingRepl, scaldingJson, @@ -292,11 +296,13 @@ lazy val scaldingArgs = module("args") lazy val scaldingDate = module("date") lazy val cascadingVersion = - System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "2.6.1") + System.getenv.asScala.getOrElse("SCALDING_CASCADING_VERSION", "3.1.0-wip-52") -// This is a temporary placeholder while we migrate to cascading3, a few subprojects at a time -// and should eventually be folded into cascadingVersion when we merge to develop. -val cascadingThreeVersion = "3.0.3" +lazy val elephantbirdCascadingArtifact = cascadingVersion.split('.').head match { + case "2" => "elephant-bird-cascading2" + case "3" => "elephant-bird-cascading3" + case other => sys.error(s"Unsupported cascading major version: $other") +} lazy val cascadingJDBCVersion = System.getenv.asScala.getOrElse("SCALDING_CASCADING_JDBC_VERSION", "2.6.0") @@ -340,7 +346,7 @@ lazy val scaldingCommons = module("commons").settings( "com.twitter" %% "bijection-core" % bijectionVersion, "com.twitter" %% "algebird-core" % algebirdVersion, "com.twitter" %% "chill" % chillVersion, - "com.twitter.elephantbird" % "elephant-bird-cascading2" % elephantbirdVersion, + "com.twitter.elephantbird" % elephantbirdCascadingArtifact % elephantbirdVersion, "com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion, "com.hadoop.gplcompression" % "hadoop-lzo" % hadoopLzoVersion, // TODO: split this out into scalding-thrift @@ -401,8 +407,8 @@ lazy val scaldingParquetCascading = module("parquet-cascading").settings( exclude("com.twitter.elephantbird", "elephant-bird-core"), "org.apache.thrift" % "libthrift" % thriftVersion, "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", - "cascading" % "cascading-core" % cascadingThreeVersion % "provided", - "cascading" % "cascading-hadoop" % cascadingThreeVersion % "provided", + "cascading" % "cascading-core" % cascadingVersion % "provided", + "cascading" % "cascading-hadoop" % cascadingVersion % "provided", "com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion % "test" ) ).dependsOn(scaldingParquetFixtures % "test->test") @@ -451,8 +457,8 @@ lazy val scaldingParquetScroogeCascading = module("parquet-scrooge-cascading") .settings( libraryDependencies ++= Seq( // see https://issues.apache.org/jira/browse/PARQUET-143 for exclusions - "cascading" % "cascading-core" % cascadingThreeVersion % "provided", - "cascading" % "cascading-hadoop" % cascadingThreeVersion % "test", + "cascading" % "cascading-core" % cascadingVersion % "provided", + "cascading" % "cascading-hadoop" % cascadingVersion % "test", "org.apache.parquet" % "parquet-thrift" % parquetVersion % "test" classifier "tests" exclude("org.apache.parquet", "parquet-pig") exclude("com.twitter.elephantbird", "elephant-bird-pig") @@ -602,7 +608,7 @@ lazy val maple = Project( libraryDependencies <++= (scalaVersion) { scalaVersion => Seq( "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", "org.apache.hbase" % "hbase" % hbaseVersion % "provided", - "cascading" % "cascading-hadoop" % cascadingThreeVersion % "provided" + "cascading" % "cascading-hadoop" % cascadingVersion % "provided" ) } ) @@ -628,6 +634,7 @@ lazy val scaldingDb = module("db").settings( libraryDependencies <++= (scalaVersion) { scalaVersion => Seq( "org.scala-lang" % "scala-library" % scalaVersion, "org.scala-lang" % "scala-reflect" % scalaVersion, + "cascading" % "cascading-core" % cascadingVersion, "com.twitter" %% "bijection-macros" % bijectionVersion ) ++ (if(isScala210x(scalaVersion)) Seq("org.scalamacros" %% "quasiquotes" % "2.0.1") else Seq()) }, diff --git a/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java b/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java index bf95160efb..e1add5bbb2 100644 --- a/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java +++ b/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/KeyValueByteScheme.java @@ -3,8 +3,8 @@ import java.io.IOException; import java.util.Arrays; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; @@ -12,12 +12,13 @@ import cascading.scheme.SinkCall; import cascading.scheme.SourceCall; import cascading.scheme.hadoop.WritableSequenceFile; +import cascading.tap.Tap; import cascading.tuple.Fields; import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; /** - * + * Used in conjunction with VersionedKeyValSource. */ public class KeyValueByteScheme extends WritableSequenceFile { public KeyValueByteScheme(Fields fields) { @@ -29,7 +30,15 @@ public static byte[] getBytes(BytesWritable key) { } @Override - public boolean source(FlowProcess flowProcess, + public void sourceConfInit(FlowProcess flowProcess, + Tap tap, Configuration conf) { + super.sourceConfInit(flowProcess, tap, conf); + conf.setClass("mapred.input.format.class", VersionedSequenceFileInputFormat.class, + org.apache.hadoop.mapred.InputFormat.class); + } + + @Override + public boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException { BytesWritable key = (BytesWritable) sourceCall.getContext()[0]; BytesWritable value = (BytesWritable) sourceCall.getContext()[1]; @@ -47,7 +56,7 @@ public boolean source(FlowProcess flowProcess, } @Override - public void sink(FlowProcess flowProcess, SinkCall sinkCall) + public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); diff --git a/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/VersionedSequenceFileInputFormat.java b/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/VersionedSequenceFileInputFormat.java new file mode 100644 index 0000000000..9166306701 --- /dev/null +++ b/scalding-commons/src/main/java/com/twitter/scalding/commons/scheme/VersionedSequenceFileInputFormat.java @@ -0,0 +1,64 @@ +package com.twitter.scalding.commons.scheme; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.io.IOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileRecordReader; + +/** + * Hadoop's SequenceFileInputFormat assumes separate "data" and index" files per directory. + * This does not apply to VersionedKeyValSource, so we bypass that behavior. + */ +public class VersionedSequenceFileInputFormat extends FileInputFormat { + + public VersionedSequenceFileInputFormat() { + setMinSplitSize(SequenceFile.SYNC_INTERVAL); + } + + private final PathFilter hiddenPathFilter = new PathFilter() { + // avoid hidden files and directories. + @Override + public boolean accept(Path path) { + String name = path.getName(); + return !name.startsWith(".") && !name.startsWith("_"); + } + }; + + @Override + protected FileStatus[] listStatus(JobConf job) throws IOException { + // we pick all the parent directories (should be only one for the picked version) + // and fetch the part files (non-hidden) under them + // any files in the parent list are version files which are to be disregarded + FileStatus[] parentPaths = super.listStatus(job); + List result = new ArrayList(); + for (int i = 0; i < parentPaths.length; i++) { + FileStatus status = parentPaths[i]; + if (status.isDirectory()) { + // add all files under this dir + FileSystem fs = status.getPath().getFileSystem(job); + result.addAll(Arrays.asList(fs.listStatus(status.getPath(), hiddenPathFilter))); + } + } + return result.toArray(new FileStatus[0]); + } + + public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) + throws IOException { + reporter.setStatus(split.toString()); + return new SequenceFileRecordReader(job, (FileSplit) split); + } +} + diff --git a/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java b/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java index 21b1f7fbb3..c3105e8801 100644 --- a/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java +++ b/scalding-commons/src/main/java/com/twitter/scalding/commons/tap/VersionedTap.java @@ -4,6 +4,7 @@ import com.twitter.scalding.commons.datastores.VersionedStore; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; @@ -13,6 +14,7 @@ import org.apache.hadoop.mapred.RecordReader; import cascading.flow.FlowProcess; +import cascading.flow.hadoop.util.HadoopUtil; import cascading.scheme.Scheme; import cascading.tap.hadoop.Hfs; @@ -30,7 +32,7 @@ public static enum TapMode {SOURCE, SINK} // sink-specific private String newVersionPath; - public VersionedTap(String dir, Scheme scheme, TapMode mode) + public VersionedTap(String dir, Scheme scheme, TapMode mode) throws IOException { super(scheme, dir); this.mode = mode; @@ -59,11 +61,11 @@ public String getOutputDirectory() { return getPath().toString(); } - public VersionedStore getStore(JobConf conf) throws IOException { + public VersionedStore getStore(Configuration conf) throws IOException { return new VersionedStore(FileSystem.get(conf), getOutputDirectory()); } - public String getSourcePath(JobConf conf) { + public String getSourcePath(Configuration conf) { VersionedStore store; try { store = getStore(conf); @@ -77,7 +79,7 @@ public String getSourcePath(JobConf conf) { } } - public String getSinkPath(JobConf conf) { + public String getSinkPath(Configuration conf) { try { VersionedStore store = getStore(conf); String sinkPath = (version == null) ? store.createVersion() : store.createVersion(version); @@ -91,33 +93,36 @@ public String getSinkPath(JobConf conf) { } @Override - public void sourceConfInit(FlowProcess process, JobConf conf) { + public void sourceConfInit(FlowProcess process, Configuration conf) { super.sourceConfInit(process, conf); - FileInputFormat.setInputPaths(conf, getSourcePath(conf)); + conf.unset("mapred.input.dir"); // need this to unset any paths set in super.sourceConfInit + Path fullyQualifiedPath = getFileSystem(conf).makeQualified(new Path(getSourcePath(conf))); + HadoopUtil.addInputPath(conf, fullyQualifiedPath); } @Override - public void sinkConfInit(FlowProcess process, JobConf conf) { + public void sinkConfInit(FlowProcess process, Configuration conf) { super.sinkConfInit(process, conf); if (newVersionPath == null) newVersionPath = getSinkPath(conf); - FileOutputFormat.setOutputPath(conf, new Path(newVersionPath)); + Path fullyQualifiedPath = getFileSystem(conf).makeQualified(new Path(newVersionPath)); + HadoopUtil.setOutputPath(conf, fullyQualifiedPath); } @Override - public boolean resourceExists(JobConf jc) throws IOException { + public boolean resourceExists(Configuration jc) throws IOException { return getStore(jc).mostRecentVersion() != null; } @Override - public boolean createResource(JobConf jc) throws IOException { + public boolean createResource(Configuration jc) throws IOException { throw new UnsupportedOperationException("Not supported yet."); } @Override - public boolean deleteResource(JobConf jc) throws IOException { + public boolean deleteResource(Configuration jc) throws IOException { throw new UnsupportedOperationException("Not supported yet."); } @@ -131,13 +136,13 @@ public String getIdentifier() { } @Override - public long getModifiedTime(JobConf conf) throws IOException { + public long getModifiedTime(Configuration conf) throws IOException { VersionedStore store = getStore(conf); return (mode == TapMode.SINK) ? 0 : store.mostRecentVersion(); } @Override - public boolean commitResource(JobConf conf) throws IOException { + public boolean commitResource(Configuration conf) throws IOException { VersionedStore store = new VersionedStore(FileSystem.get(conf), getOutputDirectory()); if (newVersionPath != null) { @@ -150,7 +155,7 @@ public boolean commitResource(JobConf conf) throws IOException { return true; } - private static void markSuccessfulOutputDir(Path path, JobConf conf) throws IOException { + private static void markSuccessfulOutputDir(Path path, Configuration conf) throws IOException { FileSystem fs = FileSystem.get(conf); // create a file in the folder to mark it if (fs.exists(path)) { @@ -160,7 +165,7 @@ private static void markSuccessfulOutputDir(Path path, JobConf conf) throws IOEx } @Override - public boolean rollbackResource(JobConf conf) throws IOException { + public boolean rollbackResource(Configuration conf) throws IOException { if (newVersionPath != null) { getStore(conf).failVersion(newVersionPath); newVersionPath = null; diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala index 25a6c4e0cf..3e33c75430 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoGenericScheme.scala @@ -20,7 +20,7 @@ import scala.reflect.ClassTag import com.twitter.bijection._ import com.twitter.chill.Externalizer -import com.twitter.elephantbird.cascading2.scheme.LzoBinaryScheme +import com.twitter.elephantbird.cascading3.scheme.LzoBinaryScheme import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat import com.twitter.elephantbird.mapreduce.io.{ BinaryConverter, GenericWritable } import com.twitter.elephantbird.mapreduce.input.{ BinaryConverterProvider, MultiInputFormat } @@ -97,7 +97,7 @@ object LzoGenericScheme { /** * From a Binary Converter passed in configure in the JobConf using of that by ElephantBird */ - def setConverter[M](conv: BinaryConverter[M], conf: JobConf, confKey: String, overrideConf: Boolean = false): Unit = { + def setConverter[M](conv: BinaryConverter[M], conf: Configuration, confKey: String, overrideConf: Boolean = false): Unit = { if ((conf.get(confKey) == null) || overrideConf) { val extern = Externalizer(conv) try { @@ -120,9 +120,9 @@ class LzoGenericScheme[M](@transient conv: BinaryConverter[M], clazz: Class[M]) override protected def prepareBinaryWritable(): GenericWritable[M] = new GenericWritable(conv) - override def sourceConfInit(fp: FlowProcess[JobConf], - tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], - conf: JobConf): Unit = { + override def sourceConfInit(fp: FlowProcess[_ <: Configuration], + tap: Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]], + conf: Configuration): Unit = { LzoGenericScheme.setConverter(conv, conf, SourceConfigBinaryConverterProvider.ProviderConfKey) MultiInputFormat.setClassConf(clazz, conf) @@ -131,9 +131,9 @@ class LzoGenericScheme[M](@transient conv: BinaryConverter[M], clazz: Class[M]) DelegateCombineFileInputFormat.setDelegateInputFormat(conf, classOf[MultiInputFormat[_]]) } - override def sinkConfInit(fp: FlowProcess[JobConf], - tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], - conf: JobConf): Unit = { + override def sinkConfInit(fp: FlowProcess[_ <: Configuration], + tap: Tap[Configuration, RecordReader[_, _], OutputCollector[_, _]], + conf: Configuration): Unit = { LzoGenericScheme.setConverter(conv, conf, SinkConfigBinaryConverterProvider.ProviderConfKey) LzoGenericBlockOutputFormat.setClassConf(clazz, conf) LzoGenericBlockOutputFormat.setGenericConverterClassConf(classOf[SinkConfigBinaryConverterProvider[_]], conf) diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala index eeb28fc929..def9bc1673 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTraits.scala @@ -22,7 +22,7 @@ import cascading.scheme.Scheme import org.apache.thrift.TBase import com.google.protobuf.Message import com.twitter.bijection.Injection -import com.twitter.elephantbird.cascading2.scheme._ +import com.twitter.elephantbird.cascading3.scheme._ import com.twitter.scalding._ import com.twitter.scalding.Dsl._ import com.twitter.scalding.source.{ CheckedInversion, MaxFailuresCheck } diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala index 089968b9a4..83bc91907a 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/LzoTypedText.scala @@ -3,7 +3,7 @@ package com.twitter.scalding.commons.source import cascading.scheme.Scheme import cascading.scheme.hadoop.{ TextDelimited => CHTextDelimited } import cascading.scheme.local.{ TextDelimited => CLTextDelimited } -import com.twitter.elephantbird.cascading2.scheme.LzoTextDelimited +import com.twitter.elephantbird.cascading3.scheme.LzoTextDelimited import com.twitter.scalding._ import com.twitter.scalding.source.TypedTextDelimited import com.twitter.scalding.source.TypedSep diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala index 177bb3c416..22a3a9dde9 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala @@ -32,7 +32,7 @@ import com.twitter.scalding.commons.tap.VersionedTap.TapMode import com.twitter.scalding.source.{ CheckedInversion, MaxFailuresCheck } import com.twitter.scalding.typed.KeyedListLike import com.twitter.scalding.typed.TypedSink -import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.{ JobConf, OutputCollector, RecordReader } import scala.collection.JavaConverters._ /** @@ -69,7 +69,7 @@ class VersionedKeyValSource[K, V](val path: String, val sourceVersion: Option[Lo override def setter[U <: (K, V)] = TupleSetter.asSubSetter[(K, V), U](TupleSetter.of[(K, V)]) - def hdfsScheme = + def hdfsScheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _] = HadoopSchemeInstance(new KeyValueByteScheme(fields).asInstanceOf[Scheme[_, _, _, _, _]]) @deprecated("This method is deprecated", "0.1.6") @@ -77,7 +77,7 @@ class VersionedKeyValSource[K, V](val path: String, val sourceVersion: Option[Lo this(path, sourceVersion, sinkVersion, maxFailures, VersionedKeyValSource.defaultVersionsToKeep)(codec) def getTap(mode: TapMode) = { - val tap = new VersionedTap(path, hdfsScheme, mode).setVersionsToKeep(versionsToKeep) + val tap = new VersionedTap(path, Hadoop2SchemeInstance(hdfsScheme), mode).setVersionsToKeep(versionsToKeep) if (mode == TapMode.SOURCE && sourceVersion.isDefined) tap.setVersion(sourceVersion.get) else if (mode == TapMode.SINK && sinkVersion.isDefined) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala index 9fa95b0e2c..7f115ed5dc 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala @@ -22,7 +22,7 @@ import com.twitter.chill.{ ExternalizerCodec, ExternalizerInjection, Externalize import com.twitter.chill.config.{ ScalaMapConfig, ConfiguredInstantiator } import com.twitter.bijection.{ Base64String, Injection } -import cascading.pipe.assembly.AggregateBy +import cascading.pipe.assembly.AggregateByProps import cascading.flow.{ FlowListener, FlowStepListener, FlowProps, FlowStepStrategy } import cascading.property.AppProps import cascading.tuple.collect.SpillableProps @@ -109,7 +109,7 @@ trait Config extends Serializable { * the best results */ def setMapSideAggregationThreshold(count: Int): Config = - this + (AggregateBy.AGGREGATE_BY_THRESHOLD -> count.toString) + this + (AggregateByProps.AGGREGATE_BY_CAPACITY -> count.toString) /** * Set this configuration option to require all grouping/cogrouping @@ -369,6 +369,9 @@ trait Config extends Serializable { object Config { val CascadingAppName: String = "cascading.app.name" val CascadingAppId: String = "cascading.app.id" + // This is the old config AGGREGATE_BY_THRESHOLD which is no longer present in cascading3 + // We maintain our own copy to provide backward compatibility + val CascadingAggregateByThreshold = "cascading.aggregateby.threshold" val CascadingSerializationTokens = "cascading.serialization.tokens" val IoSerializationsKey: String = "io.serializations" val ScaldingFlowClassName: String = "scalding.flow.class.name" diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala index 5481e0a3aa..ad21d3e91b 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala @@ -803,7 +803,7 @@ object ExecutionCounters { * Just gets the counters from the CascadingStats and ignores * all the other fields present */ - def fromCascading(cs: cascading.stats.CascadingStats): ExecutionCounters = new ExecutionCounters { + def fromCascading(cs: cascading.stats.CascadingStats[_]): ExecutionCounters = new ExecutionCounters { import scala.collection.JavaConverters._ val keys = (for { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionApp.scala b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionApp.scala index 3a545cafe0..42df365bd4 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionApp.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionApp.scala @@ -41,10 +41,12 @@ object ExecutionApp { def extractUserHadoopArgs(args: Array[String]): (HadoopArgs, NonHadoopArgs) = { + val argsWithLibJars = ExpandLibJarsGlobs(args) + // This adds a look back mechanism to match on other hadoop args we need to support // currently thats just libjars val (hadoopArgs, tmpNonHadoop, finalLast) = - args.foldLeft(Array[String](), Array[String](), Option.empty[String]) { + argsWithLibJars.foldLeft(Array[String](), Array[String](), Option.empty[String]) { // Current is a -D, so store the last in non hadoop, and add current to hadoop args case ((hadoopArgs, nonHadoop, Some(l)), current) if dArgPattern.findFirstIn(current).isDefined => (hadoopArgs :+ current, nonHadoop :+ l, None) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala index 8c0497866a..74699a4929 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/ExecutionContext.scala @@ -151,7 +151,7 @@ object ExecutionContext { private val LOG: Logger = LoggerFactory.getLogger(ExecutionContext.getClass) private[scalding] def getDesc[T](baseFlowStep: BaseFlowStep[T]): Seq[String] = { - baseFlowStep.getGraph.vertexSet.asScala.toSeq.flatMap(_ match { + baseFlowStep.getElementGraph.vertexSet.asScala.toSeq.flatMap(_ match { case pipe: Pipe => RichPipe.getPipeDescriptions(pipe) case _ => List() // no descriptions }) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala index fb98f62a98..92a2e9bb6a 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala @@ -64,7 +64,9 @@ trait HfsTapProvider { def createHfsTap(scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _], path: String, sinkMode: SinkMode): Hfs = - new Hfs(scheme, path, sinkMode) + new Hfs( + Hadoop2SchemeInstance(scheme), + path, sinkMode) } private[scalding] object CastFileTap { @@ -404,10 +406,7 @@ trait SuccessFileSource extends FileSource { trait LocalTapSource extends LocalSourceOverride { override def createLocalTap(sinkMode: SinkMode): Tap[JobConf, _, _] = { val taps = localPaths.map { p => - // temporary workaround. Remove when scalding-core is migrated to cascading3. - val scheme = hdfsScheme.asInstanceOf[Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _]] - // end temporary workaround - new LocalTap(p, scheme, sinkMode).asInstanceOf[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] + new LocalTap(p, Hadoop2SchemeInstance(hdfsScheme), sinkMode).asInstanceOf[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] }.toSeq taps match { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala b/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala index 230378d31f..74881e3930 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/HfsConfPropertySetter.scala @@ -15,19 +15,20 @@ limitations under the License. */ package com.twitter.scalding +import cascading.flow.FlowProcess +import cascading.scheme.Scheme import cascading.tap.hadoop.Hfs import cascading.tap.SinkMode +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf -import cascading.flow.FlowProcess import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.OutputCollector -import cascading.scheme.Scheme private[scalding] class ConfPropertiesHfsTap(config: Config, - scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _], + scheme: Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _], stringPath: String, sinkMode: SinkMode) extends Hfs(scheme, stringPath, sinkMode) { - override def sourceConfInit(process: FlowProcess[JobConf], conf: JobConf): Unit = { + override def sourceConfInit(process: FlowProcess[_ <: Configuration], conf: Configuration): Unit = { config.toMap.foreach { case (k, v) => conf.set(k, v) @@ -35,7 +36,7 @@ private[scalding] class ConfPropertiesHfsTap(config: Config, super.sourceConfInit(process, conf) } - override def sinkConfInit(process: FlowProcess[JobConf], conf: JobConf): Unit = { + override def sinkConfInit(process: FlowProcess[_ <: Configuration], conf: Configuration): Unit = { config.toMap.foreach { case (k, v) => conf.set(k, v) @@ -58,5 +59,5 @@ trait HfsConfPropertySetter extends HfsTapProvider { scheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _], path: String, sinkMode: SinkMode): Hfs = - new ConfPropertiesHfsTap(tapConfig, scheme, path, sinkMode) -} \ No newline at end of file + new ConfPropertiesHfsTap(tapConfig, Hadoop2SchemeInstance(scheme), path, sinkMode) +} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Job.scala b/scalding-core/src/main/scala/com/twitter/scalding/Job.scala index 787986e5a6..9fada4ac36 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Job.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Job.scala @@ -254,7 +254,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable { FlowStateMap.clear(flowDef) } - protected def handleStats(statsData: CascadingStats) { + protected def handleStats(statsData: CascadingStats[_]) { scaldingCascadingStats = Some(statsData) // TODO: Why the two ways to do stats? Answer: jank-den. if (args.boolean("scalding.flowstats")) { @@ -281,7 +281,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable { // This awful name is designed to avoid collision // with subclasses @transient - private[scalding] var scaldingCascadingStats: Option[CascadingStats] = None + private[scalding] var scaldingCascadingStats: Option[CascadingStats[_]] = None /** * Save the Flow object after a run to allow clients to inspect the job. diff --git a/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala b/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala index dac1f1a720..33584813a2 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/JobStats.scala @@ -21,7 +21,7 @@ import cascading.stats.{ CascadeStats, CascadingStats, FlowStats } import scala.util.{ Failure, Try } object JobStats { - def apply(stats: CascadingStats): JobStats = { + def apply(stats: CascadingStats[_]): JobStats = { val m = statsMap(stats) new JobStats( stats match { @@ -30,14 +30,14 @@ object JobStats { }) } - private def counterMap(stats: CascadingStats): Map[String, Map[String, Long]] = + private def counterMap(stats: CascadingStats[_]): Map[String, Map[String, Long]] = stats.getCounterGroups.asScala.map { group => (group, stats.getCountersFor(group).asScala.map { counter => (counter, stats.getCounterValue(group, counter)) }.toMap) }.toMap - private def statsMap(stats: CascadingStats): Map[String, Any] = + private def statsMap(stats: CascadingStats[_]): Map[String, Any] = Map( "counters" -> counterMap(stats), "duration" -> stats.getDuration, diff --git a/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala b/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala index 148f3b0eb1..874f84b19b 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/JobTest.scala @@ -59,7 +59,7 @@ object CascadeTest { class JobTest(cons: (Args) => Job) { private var argsMap = Map[String, List[String]]() private val callbacks = Buffer[() => Unit]() - private val statsCallbacks = Buffer[(CascadingStats) => Unit]() + private val statsCallbacks = Buffer[(CascadingStats[_]) => Unit]() // TODO: Switch the following maps and sets from Source to String keys // to guard for scala equality bugs private var sourceMap: (Source) => Option[Buffer[Tuple]] = { _ => None } @@ -124,13 +124,13 @@ class JobTest(cons: (Args) => Job) { // If this test is checking for multiple jobs chained by next, this only checks // for the counters in the final job's FlowStat. def counter(counter: String, group: String = Stats.ScaldingGroup)(op: Long => Unit) = { - statsCallbacks += ((stats: CascadingStats) => op(Stats.getCounterValue(counter, group)(stats))) + statsCallbacks += ((stats: CascadingStats[_]) => op(Stats.getCounterValue(counter, group)(stats))) this } // Used to check an assertion on all custom counters of a given scalding job. def counters(op: Map[String, Long] => Unit) = { - statsCallbacks += ((stats: CascadingStats) => op(Stats.getAllCustomCounters()(stats))) + statsCallbacks += ((stats: CascadingStats[_]) => op(Stats.getAllCustomCounters()(stats))) this } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/LibJarsExpansion.scala b/scalding-core/src/main/scala/com/twitter/scalding/LibJarsExpansion.scala new file mode 100644 index 0000000000..b8863a4dab --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/LibJarsExpansion.scala @@ -0,0 +1,65 @@ +/* +Copyright 2014 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.scalding + +import java.io.File +import java.nio.file.Path + +object ExpandLibJarsGlobs { + def apply(inputArgs: Array[String]): Array[String] = { + // First we are going to expand out the libjars if we find it + val libJarsIdx = inputArgs.indexOf("-libjars") + 1 + if (libJarsIdx > 0 && libJarsIdx < inputArgs.length) { // 0 would mean we never found -libjars + val newArgs = new Array[String](inputArgs.length) + System.arraycopy(inputArgs, 0, newArgs, 0, inputArgs.length) + + val existing = newArgs(libJarsIdx) + val replacement = existing.split(",").flatMap { element => + fromGlob(element).map(_.toString) + }.mkString(",") + + newArgs(libJarsIdx) = replacement + newArgs + } else inputArgs + } + + //tree from Duncan McGregor @ http://stackoverflow.com/questions/2637643/how-do-i-list-all-files-in-a-subdirectory-in-scala + private[this] def tree(root: File, skipHidden: Boolean = false): Stream[File] = + if (!root.exists || (skipHidden && root.isHidden)) Stream.empty + else root #:: ( + root.listFiles match { + case null => Stream.empty + case files => files.toStream.flatMap(tree(_, skipHidden)) + }) + + def fromGlob(glob: String, filesOnly: Boolean = true): Stream[Path] = { + import java.nio._ + import java.nio.file._ + val fs = FileSystems.getDefault() + val expandedSlash = if (glob.endsWith("/")) s"${glob}/*" else glob + val absoluteGlob = fs.getPath(expandedSlash).toAbsolutePath + val matcher: PathMatcher = fs.getPathMatcher(s"glob:$absoluteGlob") + + val parentPath = + if (absoluteGlob.getFileName.toString.contains("*")) absoluteGlob.getParent else absoluteGlob + + val pathStream = tree(parentPath.toFile).map(_.toPath) + + val globMatchingPaths = pathStream.filter(matcher.matches) + + if (filesOnly) globMatchingPaths.filter(_.toFile.isFile) else globMatchingPaths + } +} \ No newline at end of file diff --git a/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala b/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala index 896c63496a..e754e784ab 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/MemoryTap.scala @@ -44,11 +44,11 @@ class MemoryTap[In, Out](val scheme: Scheme[Properties, In, Out, _, _], val tupl override def getModifiedTime(conf: Properties) = if (resourceExists(conf)) modifiedTime else 0L override lazy val getIdentifier: String = scala.math.random.toString - override def openForRead(flowProcess: FlowProcess[Properties], input: In) = { + override def openForRead(flowProcess: FlowProcess[_ <: Properties], input: In) = { new TupleEntryChainIterator(scheme.getSourceFields, tupleBuffer.toIterator.asJava) } - override def openForWrite(flowProcess: FlowProcess[Properties], output: Out): TupleEntryCollector = { + override def openForWrite(flowProcess: FlowProcess[_ <: Properties], output: Out): TupleEntryCollector = { tupleBuffer.clear new MemoryTupleEntryCollector(tupleBuffer, this) } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala index 578c76639c..62c17f2940 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala @@ -18,7 +18,7 @@ package com.twitter.scalding { import cascading.operation._ import cascading.tuple._ import cascading.flow._ - import cascading.pipe.assembly.AggregateBy + import cascading.pipe.assembly.{ AggregateBy, AggregateByProps } import com.twitter.chill.MeatLocker import scala.collection.JavaConverters._ @@ -134,15 +134,23 @@ package com.twitter.scalding { val boxedSemigroup = Externalizer(commutativeSemigroup) val DEFAULT_CACHE_SIZE = 100000 - val SIZE_CONFIG_KEY = AggregateBy.AGGREGATE_BY_THRESHOLD + val CASCADING2_SIZE_CONFIG_KEY = Config.CascadingAggregateByThreshold + val CASCADING3_SIZE_CONFIG_KEY = AggregateByProps.AGGREGATE_BY_CAPACITY def cacheSize(fp: FlowProcess[_]): Int = cacheSize.orElse { - Option(fp.getStringProperty(SIZE_CONFIG_KEY)) - .filterNot { _.isEmpty } - .map { _.toInt } - } - .getOrElse(DEFAULT_CACHE_SIZE) + def getInt(k: String): Option[Int] = Option(fp.getStringProperty(k)).filterNot(_.isEmpty).map(_.toInt) + val cascading2Property = getInt(CASCADING2_SIZE_CONFIG_KEY) + val cascading3Property = getInt(CASCADING3_SIZE_CONFIG_KEY) + // we support both old and new properties for backward compatibility + // and pick the max of the two, when both exist + (cascading2Property, cascading3Property) match { + case (Some(a), Some(b)) => Some(Ordering[Int].max(a, b)) + case (None, None) => None + case (Some(a), _) => Some(a) + case (_, Some(b)) => Some(b) + } + }.getOrElse(DEFAULT_CACHE_SIZE) override def prepare(flowProcess: FlowProcess[_], operationCall: OperationCall[SummingCache[Tuple, V]]) { //Set up the context: diff --git a/scalding-core/src/main/scala/com/twitter/scalding/PartitionSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/PartitionSource.scala index de5661783c..88030f98d0 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/PartitionSource.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/PartitionSource.scala @@ -91,8 +91,7 @@ abstract class PartitionSource(val openWritesThreshold: Option[Int] = None) exte /** * An implementation of TSV output, split over a partition tap. * - * Similar to TemplateSource, but with addition of tsvFields, to - * let users explicitly specify which fields they want to see in + * tsvFields lets users explicitly specify which fields they want to see in * the TSV (allows user to discard path fields). * * apply assumes user wants a DelimitedPartition (the only diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Source.scala b/scalding-core/src/main/scala/com/twitter/scalding/Source.scala index 094613235c..73c4f645d9 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Source.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Source.scala @@ -28,6 +28,7 @@ import cascading.tuple.{ Fields, Tuple => CTuple, TupleEntry, TupleEntryCollecto import cascading.pipe.Pipe +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.RecordReader @@ -59,7 +60,7 @@ class InvalidSourceTap(val hdfsPaths: Iterable[String]) extends SourceTap[JobCon override def getModifiedTime(conf: JobConf): Long = 0L - override def openForRead(flow: FlowProcess[JobConf], input: RecordReader[_, _]): TupleEntryIterator = + override def openForRead(flow: FlowProcess[_ <: JobConf], input: RecordReader[_, _]): TupleEntryIterator = sys.error(s"InvalidSourceTap: No good paths in $hdfsPaths") override def resourceExists(conf: JobConf): Boolean = false @@ -75,8 +76,10 @@ class InvalidSourceTap(val hdfsPaths: Iterable[String]) extends SourceTap[JobCon // 4. source.validateTaps (throws InvalidSourceException) // In the worst case if the flow plan is misconfigured, // openForRead on mappers should fail when using this tap. - override def sourceConfInit(flow: FlowProcess[JobConf], conf: JobConf): Unit = { - conf.setInputFormat(classOf[cascading.tap.hadoop.io.MultiInputFormat]) + override def sourceConfInit(flow: FlowProcess[_ <: JobConf], conf: JobConf): Unit = { + conf.setClass("mapred.input.format.class", + classOf[cascading.tap.hadoop.io.MultiInputFormat], + classOf[org.apache.hadoop.mapred.InputFormat[_, _]]); super.sourceConfInit(flow, conf) } } @@ -97,6 +100,11 @@ object HadoopSchemeInstance { scheme.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] } +object Hadoop2SchemeInstance { + def apply(scheme: Scheme[_, _, _, _, _]) = + scheme.asInstanceOf[Scheme[Configuration, RecordReader[_, _], OutputCollector[_, _], _, _]] +} + object CastHfsTap { // The scala compiler has problems with the generics in Cascading def apply(tap: Hfs): Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]] = @@ -251,7 +259,7 @@ class NullTap[Config, Input, Output, SourceContext, SinkContext] SinkMode.UPDATE) { def getIdentifier = "nullTap" - def openForWrite(flowProcess: FlowProcess[Config], output: Output) = + def openForWrite(flowProcess: FlowProcess[_ <: Config], output: Output) = new TupleEntryCollector { override def add(te: TupleEntry) {} override def add(t: CTuple) {} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala b/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala index 94871425e0..7eb6d45745 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala @@ -62,11 +62,11 @@ object Stats { // When getting a counter value, cascadeStats takes precedence (if set) and // flowStats is used after that. Returns None if neither is defined. - def getCounterValue(key: StatKey)(implicit cascadingStats: CascadingStats): Long = + def getCounterValue(key: StatKey)(implicit cascadingStats: CascadingStats[_]): Long = cascadingStats.getCounterValue(key.group, key.counter) // Returns a map of all custom counter names and their counts. - def getAllCustomCounters()(implicit cascadingStats: CascadingStats): Map[String, Long] = { + def getAllCustomCounters()(implicit cascadingStats: CascadingStats[_]): Map[String, Long] = { val counts = for { counter <- cascadingStats.getCountersFor(ScaldingGroup).asScala value = getCounterValue(counter) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TemplateSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/TemplateSource.scala deleted file mode 100644 index 32ed32ed0b..0000000000 --- a/scalding-core/src/main/scala/com/twitter/scalding/TemplateSource.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* -Copyright 2013 Inkling, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package com.twitter.scalding - -import cascading.tap.hadoop.Hfs -import cascading.tap.hadoop.{ TemplateTap => HTemplateTap } -import cascading.tap.local.FileTap -import cascading.tap.local.{ TemplateTap => LTemplateTap } -import cascading.tap.SinkMode -import cascading.tap.Tap -import cascading.tuple.Fields - -/** - * This is a base class for template based output sources - */ -abstract class TemplateSource extends SchemedSource with HfsTapProvider { - - // The root path of the templated output. - def basePath: String - // The template as a java Formatter string. e.g. %s/%s for a two part template. - def template: String - // The fields to apply to the template. - def pathFields: Fields = Fields.ALL - - /** - * Creates the template tap. - * - * @param readOrWrite Describes if this source is being read from or written to. - * @param mode The mode of the job. (implicit) - * - * @returns A cascading TemplateTap. - */ - override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { - readOrWrite match { - case Read => throw new InvalidSourceException("Cannot use TemplateSource for input") - case Write => { - mode match { - case Local(_) => { - val localTap = new FileTap(localScheme, basePath, sinkMode) - new LTemplateTap(localTap, template, pathFields) - } - case hdfsMode @ Hdfs(_, _) => { - val hfsTap = createHfsTap(hdfsScheme, basePath, sinkMode) - new HTemplateTap(hfsTap, template, pathFields) - } - case hdfsTest @ HadoopTest(_, _) => { - val hfsTap = createHfsTap(hdfsScheme, hdfsTest.getWritePathFor(this), sinkMode) - new HTemplateTap(hfsTap, template, pathFields) - } - case _ => TestTapFactory(this, hdfsScheme).createTap(readOrWrite) - } - } - } - } - - /** - * Validates the taps, makes sure there are no nulls as the path or template. - * - * @param mode The mode of the job. - */ - override def validateTaps(mode: Mode): Unit = { - if (basePath == null) { - throw new InvalidSourceException("basePath cannot be null for TemplateTap") - } else if (template == null) { - throw new InvalidSourceException("template cannot be null for TemplateTap") - } - } -} - -/** - * An implementation of TSV output, split over a template tap. - * - * @param basePath The root path for the output. - * @param template The java formatter style string to use as the template. e.g. %s/%s. - * @param pathFields The set of fields to apply to the path. - * @param writeHeader Flag to indicate that the header should be written to the file. - * @param sinkMode How to handle conflicts with existing output. - * @param fields The set of fields to apply to the output. - */ -case class TemplatedTsv( - override val basePath: String, - override val template: String, - override val pathFields: Fields = Fields.ALL, - override val writeHeader: Boolean = false, - override val sinkMode: SinkMode = SinkMode.REPLACE, - override val fields: Fields = Fields.ALL) - extends TemplateSource with DelimitedScheme - -/** - * An implementation of SequenceFile output, split over a template tap. - * - * @param basePath The root path for the output. - * @param template The java formatter style string to use as the template. e.g. %s/%s. - * @param sequenceFields The set of fields to use for the sequence file. - * @param pathFields The set of fields to apply to the path. - * @param sinkMode How to handle conflicts with existing output. - */ -case class TemplatedSequenceFile( - override val basePath: String, - override val template: String, - val sequenceFields: Fields = Fields.ALL, - override val pathFields: Fields = Fields.ALL, - override val sinkMode: SinkMode = SinkMode.REPLACE) - extends TemplateSource with SequenceFileScheme { - - override val fields = sequenceFields -} - diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala b/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala index db3426a9ff..92d7698745 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/TestTapFactory.scala @@ -25,6 +25,7 @@ import cascading.scheme.NullScheme import java.io.{ Serializable, InputStream, OutputStream } +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.RecordReader @@ -50,7 +51,7 @@ object TestTapFactory extends Serializable { new TestTapFactory(src, sinkMode) { override def hdfsScheme = Some(scheme) } } -class TestTapFactory(src: Source, sinkMode: SinkMode) extends Serializable { +class TestTapFactory(src: Source, sinkMode: SinkMode) extends Serializable with HfsTapProvider { def sourceFields: Fields = hdfsScheme.map { _.getSourceFields }.getOrElse(sys.error("No sourceFields defined")) @@ -93,12 +94,12 @@ class TestTapFactory(src: Source, sinkMode: SinkMode) extends Serializable { val fields = sourceFields (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[Tap[JobConf, _, _]] } else { - CastHfsTap(new Hfs(hdfsScheme.get, hdfsTest.getWritePathFor(src), sinkMode)) + CastHfsTap(createHfsTap(hdfsScheme.get, hdfsTest.getWritePathFor(src), sinkMode)) } } case Write => { val path = hdfsTest.getWritePathFor(src) - CastHfsTap(new Hfs(hdfsScheme.get, path, sinkMode)) + CastHfsTap(createHfsTap(hdfsScheme.get, path, sinkMode)) } } case _ => { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Tool.scala b/scalding-core/src/main/scala/com/twitter/scalding/Tool.scala index 238edbc712..f8dcd2bacf 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Tool.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Tool.scala @@ -147,7 +147,7 @@ class Tool extends Configured with HTool { object Tool { def main(args: Array[String]) { try { - ToolRunner.run(new JobConf, new Tool, args) + ToolRunner.run(new JobConf, new Tool, ExpandLibJarsGlobs(args)) } catch { case t: Throwable => { //re-throw the exception with extra info diff --git a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala index d433fa24fd..d2bd5e9841 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/Common.scala @@ -36,7 +36,7 @@ object Common { } def unrollTaps(step: FlowStep[JobConf]): Seq[Tap[_, _, _]] = - unrollTaps(step.getSources.asScala.toSeq) + unrollTaps(step.getFlowNodeGraph.getSourceTaps.asScala.toSeq) /** * Get the total size of the file(s) specified by the Hfs, which may contain a glob diff --git a/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala b/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala index 71777fad60..ec1f2d9c90 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/serialization/CascadingBinaryComparator.scala @@ -77,7 +77,7 @@ object CascadingBinaryComparator { def getDescriptionsForMissingOrdSer[U](bfs: BaseFlowStep[U]): Option[String] = // does this job have any Splices without OrderedSerialization: - if (bfs.getGraph.vertexSet.asScala.exists { + if (bfs.getElementGraph.vertexSet.asScala.exists { case gb: GroupBy => check(gb).isFailure case cg: CoGroup => check(cg).isFailure case _ => false // only do sorting in groupBy/cogroupBy diff --git a/scalding-core/src/test/scala/com/twitter/scalding/ExpandLibJarsGlobsTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/ExpandLibJarsGlobsTest.scala new file mode 100644 index 0000000000..8bae8242de --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/ExpandLibJarsGlobsTest.scala @@ -0,0 +1,51 @@ +package com.twitter.scalding + +import java.io.File +import java.nio._ +import java.nio.file._ +import org.scalatest.{ Matchers, WordSpec } + +class ExpandLibJarsGlobsTest extends WordSpec with Matchers { + def touch(parent: File, p: String): String = { + val f = new File(parent, p) + f.createNewFile + f.getAbsolutePath + } + + "ExpandLibJarsGlobs" should { + "expand entries" in { + val tmpRoot = new File(System.getProperty("java.io.tmpdir"), System.currentTimeMillis.toString) + require(tmpRoot.mkdirs(), "Failed to make temporary directory") + tmpRoot.deleteOnExit() + + // Has a side effect, but returns us the jars absolute paths + val jars = (0 until 20).map { idx => + touch(tmpRoot, s"myF_${idx}.jar") + } + + val resultingLibJars1 = ExpandLibJarsGlobs(Array("-libjars", s"${tmpRoot.getAbsolutePath}/*.jar"))(1).split(",") + assert(resultingLibJars1.sorted.toList == jars.sorted.toList) + + val resultingLibJars2 = ExpandLibJarsGlobs(Array("-libjars", s"${tmpRoot.getAbsolutePath}/"))(1).split(",") + assert(resultingLibJars2.sorted.toList == jars.sorted.toList) + + val resultingLibJars3 = ExpandLibJarsGlobs(Array("-libjars", s"${tmpRoot.getAbsolutePath}/*"))(1).split(",") + assert(resultingLibJars3.sorted.toList == jars.sorted.toList) + } + + "Skips over unmatched entries" in { + val tmpRoot = new File(System.getProperty("java.io.tmpdir"), System.currentTimeMillis.toString) + require(tmpRoot.mkdirs(), "Failed to make temporary directory") + tmpRoot.deleteOnExit() + + // Has a side effect, but returns us the jars absolute paths + val jars = (0 until 20).map { idx => + touch(tmpRoot, s"myF_${idx}.jar") + } + + val resultingLibJars1 = ExpandLibJarsGlobs(Array("-libjars", s"${tmpRoot.getAbsolutePath}/*.zip"))(1).split(",").filter(_.nonEmpty) + assert(resultingLibJars1.isEmpty) + } + + } +} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TemplateSourceTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TemplateSourceTest.scala deleted file mode 100644 index 366b5c6676..0000000000 --- a/scalding-core/src/test/scala/com/twitter/scalding/TemplateSourceTest.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* -Copyright 2013 Inkling, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package com.twitter.scalding - -import java.io.File -import scala.io.{ Source => ScalaSource } - -import org.scalatest.{ Matchers, WordSpec } - -class TemplateTestJob(args: Args) extends Job(args) { - try { - Tsv("input", ('col1, 'col2)).read.write(TemplatedTsv("base", "%s", 'col1)) - } catch { - case e: Exception => e.printStackTrace() - } -} - -class TemplateSourceTest extends WordSpec with Matchers { - import Dsl._ - "TemplatedTsv" should { - "split output by template" in { - val input = Seq(("A", 1), ("A", 2), ("B", 3)) - - // Need to save the job to allow, find the temporary directory data was written to - var job: Job = null; - def buildJob(args: Args): Job = { - job = new TemplateTestJob(args) - job - } - - JobTest(buildJob(_)) - .source(Tsv("input", ('col1, 'col2)), input) - .runHadoop - .finish - - val testMode = job.mode.asInstanceOf[HadoopTest] - - val directory = new File(testMode.getWritePathFor(TemplatedTsv("base", "%s", 'col1))) - - directory.listFiles().map({ _.getName() }).toSet shouldBe Set("A", "B") - - val aSource = ScalaSource.fromFile(new File(directory, "A/part-00000")) - val bSource = ScalaSource.fromFile(new File(directory, "B/part-00000")) - - aSource.getLines.toList shouldBe Seq("A\t1", "A\t2") - bSource.getLines.toList shouldBe Seq("B\t3") - } - } -} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedFieldsTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedFieldsTest.scala index 65e0768335..b907d68316 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/TypedFieldsTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedFieldsTest.scala @@ -27,7 +27,7 @@ class TypedFieldsTest extends WordSpec with Matchers { "throw an exception if a field is not comparable" in { val thrown = the[FlowException] thrownBy untypedJob - thrown.getMessage shouldBe "local step failed" + thrown.getMessage should startWith("local step failed") } // Now run the typed fields version diff --git a/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala index d5ae569b50..46b25aa8cb 100644 --- a/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala +++ b/scalding-hadoop-test/src/main/scala/com/twitter/scalding/platform/LocalCluster.scala @@ -135,7 +135,6 @@ class LocalCluster(mutex: Boolean = true) { classOf[com.twitter.chill.algebird.AveragedValueSerializer], classOf[com.twitter.algebird.Semigroup[_]], classOf[com.twitter.chill.KryoInstantiator], - classOf[org.jgrapht.ext.EdgeNameProvider[_]], classOf[org.apache.commons.lang.StringUtils], classOf[cascading.scheme.local.TextDelimited], classOf[org.apache.commons.logging.LogFactory], diff --git a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/platform/PlatformTest.scala b/scalding-hadoop-test/src/test/scala/com/twitter/scalding/platform/PlatformTest.scala index c8f27752c6..f70924b11a 100644 --- a/scalding-hadoop-test/src/test/scala/com/twitter/scalding/platform/PlatformTest.scala +++ b/scalding-hadoop-test/src/test/scala/com/twitter/scalding/platform/PlatformTest.scala @@ -33,11 +33,14 @@ class InAndOutJob(args: Args) extends Job(args) { } object TinyJoinAndMergeJob { - val peopleInput = TypedTsv[Int]("input1") - val peopleData = List(1, 2, 3, 4) + val joinInput1 = TypedTsv[Int]("input1") + val joinData1 = List(1, 2, 3, 4) - val messageInput = TypedTsv[Int]("input2") - val messageData = List(1, 2, 3) + val joinInput2 = TypedTsv[Int]("input2") + val joinData2 = List(1, 2, 3) + + val mergerInput = TypedTsv[Int]("input3") + val mergerData = List(1, 2, 3, 4) val output = TypedTsv[(Int, Int)]("output") val outputData = List((1, 2), (2, 2), (3, 2), (4, 1)) @@ -46,13 +49,43 @@ object TinyJoinAndMergeJob { class TinyJoinAndMergeJob(args: Args) extends Job(args) { import TinyJoinAndMergeJob._ - val people = peopleInput.read.mapTo(0 -> 'id) { v: Int => v } + val input1 = joinInput1.read.mapTo(0 -> 'id) { v: Int => v } + + val joinedData = joinInput2.read + .mapTo(0 -> 'id) { v: Int => v } + .joinWithTiny('id -> 'id, input1) + + val mergerData = mergerInput.read.mapTo(0 -> 'id) { v: Int => v } + + (mergerData ++ joinedData).groupBy('id) { _.size('count) }.write(output) +} + +class TinyJoinAndMergeUnsupportedJob(args: Args) extends Job(args) { + import TinyJoinAndMergeJob._ + + val input1 = joinInput1.read.mapTo(0 -> 'id) { v: Int => v } + + val joined = joinInput2.read + .mapTo(0 -> 'id) { v: Int => v } + .joinWithTiny('id -> 'id, input1) + + // merging the output of a hashjoin with one of its inputs is + // no longer supported in cascading3. So we verify we fail here. + (joined ++ input1).groupBy('id) { _.size('count) }.write(output) +} + +class TinyJoinAndMergeForceToDiskJob(args: Args) extends Job(args) { + import TinyJoinAndMergeJob._ + + val input1 = joinInput1.read.mapTo(0 -> 'id) { v: Int => v } - val messages = messageInput.read + val joined = joinInput2.read .mapTo(0 -> 'id) { v: Int => v } - .joinWithTiny('id -> 'id, people) + .joinWithTiny('id -> 'id, input1) + .forceToDisk // workaround for cascading3 - (messages ++ people).groupBy('id) { _.size('count) }.write(output) + // this should work with the forceToDisk workaround + (joined ++ input1).groupBy('id) { _.size('count) }.write(output) } object TsvNoCacheJob { @@ -288,8 +321,35 @@ class PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest "merge and joinWithTiny shouldn't duplicate data" in { HadoopPlatformJobTest(new TinyJoinAndMergeJob(_), cluster) - .source(peopleInput, peopleData) - .source(messageInput, messageData) + .source(joinInput1, joinData1) + .source(joinInput2, joinData2) + .source(mergerInput, mergerData) + .sink(output) { _.toSet shouldBe (outputData.toSet) } + .run + } + } + + "A TinyJoinAndMergeUnsupportedJob" should { + import TinyJoinAndMergeJob._ + + "fail without the forceToDisk workaround" in { + an[cascading.flow.planner.PlannerException] should be thrownBy { + HadoopPlatformJobTest(new TinyJoinAndMergeUnsupportedJob(_), cluster) + .source(joinInput1, joinData1) + .source(joinInput2, joinData2) + .sink(output) { _.toSet shouldBe (outputData.toSet) } + .run + } + } + } + + "A TinyJoinAndMergeForceToDiskJob" should { + import TinyJoinAndMergeJob._ + + "run correctly with forceToDisk workaround" in { + HadoopPlatformJobTest(new TinyJoinAndMergeForceToDiskJob(_), cluster) + .source(joinInput1, joinData1) + .source(joinInput2, joinData2) .sink(output) { _.toSet shouldBe (outputData.toSet) } .run } @@ -343,13 +403,12 @@ class PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest .inspectCompletedFlow { flow => val steps = flow.getFlowSteps.asScala steps should have size 1 - val firstStep = steps.headOption.map(_.getConfig.get(Config.StepDescriptions)).getOrElse("") - val lines = List(147, 150, 154).map { i => - s"com.twitter.scalding.platform.TypedPipeJoinWithDescriptionJob.(PlatformTest.scala:$i" - } - firstStep should include ("leftJoin") - firstStep should include ("hashJoin") - lines.foreach { l => firstStep should include (l) } + val firstStepDescs = steps.headOption.map(_.getConfig.get(Config.StepDescriptions)).getOrElse("") + val firstStepDescSet = firstStepDescs.split(",").map(_.trim).toSet + + val expected = Set(180, 183, 187, 182, 186).map(linenum => /* WARNING: keep aligned with line numbers above */ + s"com.twitter.scalding.platform.TypedPipeJoinWithDescriptionJob.(PlatformTest.scala:${linenum})") ++ Seq("leftJoin", "hashJoin") + firstStepDescSet should equal(expected) steps.map(_.getConfig.get(Config.StepDescriptions)).foreach(s => info(s)) } .run @@ -361,18 +420,16 @@ class PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest HadoopPlatformJobTest(new TypedPipeWithDescriptionJob(_), cluster) .inspectCompletedFlow { flow => val steps = flow.getFlowSteps.asScala - val descs = List("map stage - assign words to 1", + val expectedDescs = Set("map stage - assign words to 1", "reduce stage - sum", - "write", - // should see the .group and the .write show up as line numbers - "com.twitter.scalding.platform.TypedPipeWithDescriptionJob.(PlatformTest.scala:137)", - "com.twitter.scalding.platform.TypedPipeWithDescriptionJob.(PlatformTest.scala:141)") - - val foundDescs = steps.map(_.getConfig.get(Config.StepDescriptions)) - descs.foreach { d => - assert(foundDescs.size == 1) - assert(foundDescs(0).contains(d)) - } + "write") ++ + Seq(169, 170, 172, 173, 174).map( /* WARNING: keep aligned with line numbers above */ + linenum => s"com.twitter.scalding.platform.TypedPipeWithDescriptionJob.(PlatformTest.scala:${linenum})") + + val foundDescs = steps.map(_.getConfig.get(Config.StepDescriptions).split(",").map(_.trim).toSet) + foundDescs should have size 1 + + foundDescs.head should equal(expectedDescs) //steps.map(_.getConfig.get(Config.StepDescriptions)).foreach(s => info(s)) } .run diff --git a/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/reducer_estimation/HRavenHistoryService.scala b/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/reducer_estimation/HRavenHistoryService.scala index 0e7b0ffa68..083084d091 100644 --- a/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/reducer_estimation/HRavenHistoryService.scala +++ b/scalding-hraven/src/main/scala/com/twitter/scalding/hraven/reducer_estimation/HRavenHistoryService.scala @@ -120,7 +120,7 @@ object HRavenHistoryService extends HistoryService { */ def fetchPastJobDetails(step: FlowStep[JobConf], max: Int): Try[Seq[JobDetails]] = { val conf = step.getConfig - val stepNum = step.getStepNum + val stepNum = step.getID def findMatchingJobStep(pastFlow: Flow) = pastFlow.getJobs.asScala.find { step => diff --git a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/tuple/scheme/TypedParquetTupleScheme.scala b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/tuple/scheme/TypedParquetTupleScheme.scala index 3e06b8e107..f604dfb421 100644 --- a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/tuple/scheme/TypedParquetTupleScheme.scala +++ b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/tuple/scheme/TypedParquetTupleScheme.scala @@ -140,14 +140,14 @@ class TypedParquetTupleScheme[T](val readSupport: ParquetReadSupport[T], val wri type SourceCallType = SourceCall[Array[AnyRef], Reader] type SinkCallType = SinkCall[Array[AnyRef], Output] - override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: TapType, jobConf: JobConf): Unit = { + override def sourceConfInit(flowProcess: FlowProcess[_ <: JobConf], tap: TapType, jobConf: JobConf): Unit = { fp.map(ParquetInputFormat.setFilterPredicate(jobConf, _)) jobConf.setInputFormat(classOf[DeprecatedParquetInputFormat[T]]) jobConf.set(ParquetInputOutputFormat.READ_SUPPORT_INSTANCE, ParquetInputOutputFormat.injection(readSupport)) ParquetInputFormat.setReadSupportClass(jobConf, classOf[ReadSupportInstanceProxy[_]]) } - override def source(flowProcess: FlowProcess[JobConf], sc: SourceCallType): Boolean = { + override def source(flowProcess: FlowProcess[_ <: JobConf], sc: SourceCallType): Boolean = { val value: Container[T] = sc.getInput.createValue() val hasNext = sc.getInput.next(null, value) @@ -161,12 +161,12 @@ class TypedParquetTupleScheme[T](val readSupport: ParquetReadSupport[T], val wri } } - override def sinkConfInit(flowProcess: FlowProcess[JobConf], tap: TapType, jobConf: JobConf): Unit = { + override def sinkConfInit(flowProcess: FlowProcess[_ <: JobConf], tap: TapType, jobConf: JobConf): Unit = { jobConf.setOutputFormat(classOf[InnerDeprecatedParquetOutputFormat[T]]) jobConf.set(ParquetInputOutputFormat.WRITE_SUPPORT_INSTANCE, ParquetInputOutputFormat.injection(writeSupport)) } - override def sink(flowProcess: FlowProcess[JobConf], sinkCall: SinkCallType): Unit = { + override def sink(flowProcess: FlowProcess[_ <: JobConf], sinkCall: SinkCallType): Unit = { val tuple = sinkCall.getOutgoingEntry require(tuple.size == 1, "TypedParquetTupleScheme expects tuple with an arity of exactly 1, but found " + tuple.getFields)