From 93096c327415a5bbc438161ac5bf7efd8d1c1011 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sun, 11 May 2014 19:46:07 -0400 Subject: [PATCH] Initial commit of bringing back map join hints. --- .../optimizer/SharkMapJoinProcessor.scala | 60 +++++++++++++++++++ .../{ => optimizer}/SharkOptimizer.scala | 16 +++-- .../shark/parse/SharkSemanticAnalyzer.scala | 3 +- 3 files changed, 74 insertions(+), 5 deletions(-) create mode 100644 src/main/scala/shark/optimizer/SharkMapJoinProcessor.scala rename src/main/scala/shark/{ => optimizer}/SharkOptimizer.scala (75%) diff --git a/src/main/scala/shark/optimizer/SharkMapJoinProcessor.scala b/src/main/scala/shark/optimizer/SharkMapJoinProcessor.scala new file mode 100644 index 00000000..72aab228 --- /dev/null +++ b/src/main/scala/shark/optimizer/SharkMapJoinProcessor.scala @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2012 The Regents of The University California. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package shark.optimizer + +import java.util.{LinkedHashMap => JavaLinkedHashMap} + +import org.apache.hadoop.hive.ql.exec.{MapJoinOperator, JoinOperator, Operator} +import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor +import org.apache.hadoop.hive.ql.parse.{ParseContext, QBJoinTree, OpParseContext} +import org.apache.hadoop.hive.ql.plan.OperatorDesc +import org.apache.hadoop.hive.conf.HiveConf + +class SharkMapJoinProcessor extends MapJoinProcessor { + + /** + * Override generateMapJoinOperator to bypass the step of validating Map Join hints int Hive. + */ + override def generateMapJoinOperator( + pctx: ParseContext, + op: JoinOperator, + joinTree: QBJoinTree, + mapJoinPos: Int): MapJoinOperator = { + val hiveConf: HiveConf = pctx.getConf + val noCheckOuterJoin: Boolean = + HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN) && + HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN) + + val opParseCtxMap: JavaLinkedHashMap[Operator[_ <: OperatorDesc], OpParseContext] = + pctx.getOpParseCtx + + // Explicitly set validateMapJoinTree to false to bypass the step of validating + // Map Join hints in Hive. + val validateMapJoinTree = false + val mapJoinOp: MapJoinOperator = + MapJoinProcessor.convertMapJoin( + opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin, validateMapJoinTree) + + // Hive originally uses genSelectPlan to insert an dummy select after the MapJoinOperator. + // We should not need this step. + // create a dummy select to select all columns + // MapJoinProcessor.genSelectPlan(pctx, mapJoinOp) + + return mapJoinOp + } +} diff --git a/src/main/scala/shark/SharkOptimizer.scala b/src/main/scala/shark/optimizer/SharkOptimizer.scala similarity index 75% rename from src/main/scala/shark/SharkOptimizer.scala rename to src/main/scala/shark/optimizer/SharkOptimizer.scala index e36184fd..08f649d1 100644 --- a/src/main/scala/shark/SharkOptimizer.scala +++ b/src/main/scala/shark/optimizer/SharkOptimizer.scala @@ -1,5 +1,5 @@ /* - * Copyright (C) 2012 The Regents of The University California. + * Copyright (C) 2012 The Regents of The University California. * All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,14 +15,15 @@ * limitations under the License. */ -package shark +package shark.optimizer import java.util.{List => JavaList} import org.apache.hadoop.hive.ql.optimizer.JoinReorder import org.apache.hadoop.hive.ql.optimizer.{Optimizer => HiveOptimizer, - SimpleFetchOptimizer, Transform} -import org.apache.hadoop.hive.ql.parse.{ParseContext} + SimpleFetchOptimizer, Transform, MapJoinProcessor => HiveMapJoinProcessor} +import org.apache.hadoop.hive.ql.parse.ParseContext +import shark.LogHelper class SharkOptimizer extends HiveOptimizer with LogHelper { @@ -49,6 +50,13 @@ class SharkOptimizer extends HiveOptimizer with LogHelper { transformation match { case _: SimpleFetchOptimizer => {} case _: JoinReorder => {} + case _: HiveMapJoinProcessor => { + // Use SharkMapJoinProcessor to bypass the step of validating Map Join hints + // in Hive. So, we can use hints to mark tables that will be considered as small + // tables (like Hive 0.9). + val sharkMapJoinProcessor = new SharkMapJoinProcessor + pctx = sharkMapJoinProcessor.transform(pctx) + } case _ => { pctx = transformation.transform(pctx) } diff --git a/src/main/scala/shark/parse/SharkSemanticAnalyzer.scala b/src/main/scala/shark/parse/SharkSemanticAnalyzer.scala index 7f3a3fd2..677da200 100755 --- a/src/main/scala/shark/parse/SharkSemanticAnalyzer.scala +++ b/src/main/scala/shark/parse/SharkSemanticAnalyzer.scala @@ -38,11 +38,12 @@ import org.apache.hadoop.hive.ql.parse._ import org.apache.hadoop.hive.ql.plan._ import org.apache.hadoop.hive.ql.session.SessionState -import shark.{LogHelper, SharkConfVars, SharkOptimizer} +import shark.{LogHelper, SharkConfVars} import shark.execution.{HiveDesc, Operator, OperatorFactory, ReduceSinkOperator} import shark.execution.{SharkDDLWork, SparkLoadWork, SparkWork, TerminalOperator} import shark.memstore2.{CacheType, LazySimpleSerDeWrapper, MemoryMetadataManager} import shark.memstore2.SharkTblProperties +import shark.optimizer.SharkOptimizer /**