From a4371cfaf6672fc67cc2961e23f241dda49314f8 Mon Sep 17 00:00:00 2001 From: XiaoBang Date: Wed, 20 Jun 2018 08:49:24 +0800 Subject: [PATCH 1/3] use alluxio to improve stability of shuffle --- .../apache/spark/AlluxioManagedBuffer.java | 151 +++++++++ .../sort/BypassMergeSortShuffleWriter.java | 7 + .../scala/org/apache/spark/SparkContext.scala | 4 + .../CoarseGrainedSchedulerBackend.scala | 6 +- .../shuffle/IndexShuffleBlockResolver.scala | 6 + .../shuffle/sort/SortShuffleWriter.scala | 9 +- .../spark/storage/AlluxioBlockManager.scala | 300 ++++++++++++++++++ .../storage/AlluxioBlockManagerMaster.scala | 63 ++++ .../apache/spark/storage/BlockManager.scala | 5 + .../spark/storage/BlockManagerMaster.scala | 9 + .../spark/storage/ExternalBlockManager.scala | 145 +++++++++ .../spark/storage/ExternalBlockStore.scala | 68 ++++ .../storage/ShuffleBlockFetcherIterator.scala | 47 ++- 13 files changed, 811 insertions(+), 9 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/AlluxioManagedBuffer.java create mode 100644 core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala create mode 100644 core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala create mode 100644 core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala create mode 100644 core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala diff --git a/core/src/main/java/org/apache/spark/AlluxioManagedBuffer.java b/core/src/main/java/org/apache/spark/AlluxioManagedBuffer.java new file mode 100644 index 0000000000000..339174484036c --- /dev/null +++ b/core/src/main/java/org/apache/spark/AlluxioManagedBuffer.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + + +import com.google.common.base.Objects; +import com.google.common.io.ByteStreams; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.network.TransportContext; +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.util.JavaUtils; +import org.apache.spark.network.util.LimitedInputStream; +import org.apache.spark.network.util.TransportConf; +import org.apache.spark.storage.ExternalBlockManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.Math.toIntExact; + +/** + * A {@link ManagedBuffer} backed by a segment in a file. + */ +public final class AlluxioManagedBuffer extends ManagedBuffer { + private static final Logger logger = LoggerFactory.getLogger(TransportContext.class); + + private final TransportConf conf; + private final Path file; + private final long offset; + private final long length; + private final ExternalBlockManager blockManager; + + public AlluxioManagedBuffer(TransportConf conf, Path file, long offset, long length, ExternalBlockManager blockManager) { + this.conf = conf; + this.file = file; + this.offset = offset; + this.length = length; + this.blockManager = blockManager; + } + + @Override + public long size() { + return length; + } + + @Override + public ByteBuffer nioByteBuffer() throws IOException { + FileSystem fs = blockManager.fs(); + if (!fs.exists(file)) { + return ByteBuffer.wrap(new byte[0]); + } else { + long size = fs.getFileStatus(file).getLen(); + if (size == 0) { + return ByteBuffer.wrap(new byte[0]); + } else { + InputStream input = fs.open(file); + try { + byte[] buffer = new byte[toIntExact(size)]; + ByteStreams.readFully(input, buffer); + return ByteBuffer.wrap(buffer,toIntExact(offset),toIntExact(length)); + } catch (IOException e){ + logger.info("Test-log: Failed to get bytes of block $blockId from Alluxio", e); + return ByteBuffer.wrap(new byte[0]); + } finally { + input.close(); + } + } + } + } + + @Override + public InputStream createInputStream() throws IOException { + + logger.info("Test-log: use function createInputStream"); + InputStream is = blockManager.createInputStream(file).get(); + logger.info("Test-log: create inputStream successfully"); + try { + + ByteStreams.skipFully(is, offset); + logger.info("Test-log: read data from inputstream from offset " + offset + " and size is " + length ); + return new LimitedInputStream(is, length); + } catch (IOException e) { + try { + if (is != null) { + long size = blockManager.fs().getFileStatus(file).getLen(); + throw new IOException("Error in reading " + this + " (actual file length " + size + ")", + e); + } + } catch (IOException ignored) { + // ignore + } finally { + JavaUtils.closeQuietly(is); + } + throw new IOException("Error in opening " + this, e); + } catch (RuntimeException e) { + JavaUtils.closeQuietly(is); + throw e; + } + } + + @Override + public ManagedBuffer retain() { + return this; + } + + @Override + public ManagedBuffer release() { + return this; + } + + @Override + public Object convertToNetty() throws IOException { + return new Object(); + } + + public Path getFile() { return file; } + + public long getOffset() { return offset; } + + public long getLength() { return length; } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("file", file) + .add("offset", offset) + .add("length", length) + .toString(); + } +} diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 323a5d3c52831..8df8c16e26c96 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -90,6 +90,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { private FileSegment[] partitionWriterSegments; @Nullable private MapStatus mapStatus; private long[] partitionLengths; + private boolean useAlluxio; /** * Are we in the process of stopping? Because map tasks can call stop() with success = true @@ -108,6 +109,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true); + this.useAlluxio = conf.getBoolean("spark.alluxio.shuffle.enabled",false); this.blockManager = blockManager; final ShuffleDependency dep = handle.dependency(); this.mapId = mapId; @@ -162,6 +164,11 @@ public void write(Iterator> records) throws IOException { try { partitionLengths = writePartitionedFile(tmp); shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); + if(useAlluxio) { + logger.info("shuffle block id " + new ShuffleDataBlockId(shuffleId, mapId, 0).name() + "\nfile path is " + + output.getPath() + "\nfile name is " + output.getName() + "file size is " + output.length()); + blockManager.externalBlockStore().externalBlockManager().get().putFile(shuffleId, new ShuffleDataBlockId(shuffleId, mapId, 0), output); + } } finally { if (tmp.exists() && !tmp.delete()) { logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b2a26c51d4de1..e7053b88700e1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -514,6 +514,10 @@ class SparkContext(config: SparkConf) extends Logging { _applicationId = _taskScheduler.applicationId() _applicationAttemptId = taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) + if (_conf.getBoolean("spark.alluxio.shuffle.enabled", false)){ + logInfo("Test-log: Init AlluxioBlockMaster") + _env.blockManager.master.init() + } if (_conf.getBoolean("spark.ui.reverseProxy", false)) { System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0b396b794ddce..1911ea330a5e2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -361,8 +361,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } if (shouldDisable) { - logInfo(s"Disabling executor $executorId.") - scheduler.executorLost(executorId, LossReasonPending) + if(SparkEnv.get.conf.getBoolean("spark.alluxio.shuffle.enabled",false)) + scheduler.executorLost(executorId, ExecutorKilled) + else + scheduler.executorLost(executorId, LossReasonPending) } shouldDisable diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 15540485170d0..855837a2f5e5d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -182,6 +182,12 @@ private[spark] class IndexShuffleBlockResolver( if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { throw new IOException("fail to rename file " + dataTmp + " to " + dataFile) } + if(SparkEnv.get.conf.getBoolean("spark.alluxio.shuffle.enabled",false)){ + logInfo("Testlog: write index file" + indexFile.getName + " indexfile blockid " + + ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) + blockManager.externalBlockStore.externalBlockManager.get.putFile(shuffleId, ShuffleIndexBlockId(shuffleId, + mapId, NOOP_REDUCE_ID), indexFile) + } } } } finally { diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 636b88e792bf3..8f6642d058c25 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -21,7 +21,7 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver, ShuffleWriter} -import org.apache.spark.storage.ShuffleBlockId +import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId} import org.apache.spark.util.Utils import org.apache.spark.util.collection.ExternalSorter @@ -71,6 +71,13 @@ private[spark] class SortShuffleWriter[K, V, C]( val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, tmp) shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) + if (SparkEnv.get.conf.getBoolean("spark.alluxio.shuffle.enabled", false)) { + if (output.length() != 0) { + blockManager.externalBlockStore.externalBlockManager.get.putFile( + dep.shuffleId, ShuffleDataBlockId(dep.shuffleId, mapId, + IndexShuffleBlockResolver.NOOP_REDUCE_ID), output) + } + } mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { diff --git a/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala new file mode 100644 index 0000000000000..8169be17bd549 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala @@ -0,0 +1,300 @@ +package org.apache.spark.storage + +/** + * Created by Chopin on 2017/12/22. + */ + +import java.io.{File, FileInputStream, InputStream} +import java.nio.ByteBuffer + +import com.google.common.io.ByteStreams +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.internal.Logging +import org.apache.spark.util.{ByteBufferInputStream, Utils} + +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +/** + * @see org.apache.spark.storage.TachyonBlockManager + */ +private[spark] class AlluxioBlockManager extends ExternalBlockManager with Logging { + + private var chroot: Path = _ + private var subDirs: Array[Array[Path]] = _ + private var hdfsDirs: Array[Array[Path]] = _ + private var hdfsroot: Path = _ + private var usehdfs: Boolean = _ + + override def toString = "ExternalBlockStore-Alluxio" + + override def init(blockManager: BlockManager): Unit = { + super.init(blockManager) + + val conf = blockManager.conf + + val masterUrl = conf.get(ExternalBlockStore.MASTER_URL, "alluxio://localhost:19998") + val hdfsUrl = conf.get("spark.alluxio.hdfs.nameservice", "viewfs://nsX") + val storeDir = conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_alluxio") + val folderName = conf.get(ExternalBlockStore.FOLD_NAME) + val subDirsPerAlluxio = conf.getInt( + "spark.externalBlockStore.subDirectories", + ExternalBlockStore.SUB_DIRS_PER_DIR.toInt) + + val master = new Path(masterUrl) + chroot = new Path(master, s"$storeDir/$folderName/" + conf.getAppId ) + fs = master.getFileSystem(new Configuration) + + val hdfsMaster = new Path(hdfsUrl) + hdfsFs = hdfsMaster.getFileSystem(new Configuration) + hdfsroot = new Path(hdfsMaster, s"$storeDir/$folderName/" + conf.getAppId) + + subDirs = Array.fill(subDirsPerAlluxio)(new Array[Path](subDirsPerAlluxio)) + hdfsDirs = Array.fill(subDirsPerAlluxio)(new Array[Path](subDirsPerAlluxio)) + + } + + override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit ={ + val path = getFile(blockId) + val output = fs.create(path, true) + try { + output.write(bytes.array()) + } catch { + case NonFatal(e) => + logWarning(s"Failed to put values of block $blockId into Alluxio", e) + } finally { + try { + output.close() + } catch { + case NonFatal(e) => + logWarning(s"Failed to close output when put bytes to Alluxion", e) + } + } + } + + /** + * this method must be override, alluxio not support write append. + */ + override def putValues(blockId: BlockId, values: Iterator[_]): Unit = { + val output = fs.create(getFile(blockId), true) + try { + blockManager.serializerManager.dataSerializeStream(blockId, output, values) + } catch { + case NonFatal(e) => + logWarning(s"Failed to put values of block $blockId into Alluxio", e) + } finally { + try { + output.close() + } catch { + case NonFatal(e) => + logWarning(s"Failed to close output when put bytes to Alluxion", e) + } + } + } + + override def putFile(shuffleId : Int, blockId: BlockId, file : File): Unit ={ + + val in = new FileInputStream(file) + val bytes = new Array[Byte](file.length().toInt) + in.read(bytes) + in.close() + putBytes(blockId, ByteBuffer.wrap(bytes)) + } + + + override def getBytes(blockId: BlockId): Option[ByteBuffer] = { + val path = getFile(blockId) + if (!fs.exists(path)) { + logInfo("Test-log path is " + path.getName + "is not exsist ") + None + } else { + val size = fs.getFileStatus(path).getLen + if (size == 0) { + logInfo("Test-log path size is " + size ) + None + } else { + val input = fs.open(path) + var flag = true + try { + val buffer = new Array[Byte](size.toInt) + ByteStreams.readFully(input, buffer) + Some(ByteBuffer.wrap(buffer)) + } catch { + case _ => { + logWarning(s"Failed to get bytes of block $blockId from Alluxio") + flag = false + getBytesFromHdfs(blockId) + } + } finally { + if(flag){ + try { + input.close() + } catch { + case NonFatal(e) => + logWarning(s"Failed to close input when get bytes to Alluxion", e) + } + } + } + } + } + } + + def getBytesFromHdfs(blockId: BlockId): Option[ByteBuffer] = { + usehdfs = true + val path = changePathToHDFS(getFile(blockId)) + if (!hdfsFs.exists(path)) { + logInfo("Test-log path is " + path.getName + "is not exsist ") + None + } else { + val size = hdfsFs.getFileStatus(path).getLen + if (size == 0) { + logInfo("Test-log path size is " + size ) + None + } else { + val input = hdfsFs.open(path) + try { + val buffer = new Array[Byte](size.toInt) + ByteStreams.readFully(input, buffer) + Some(ByteBuffer.wrap(buffer)) + } catch { + case NonFatal(e) => { + logWarning(s"Failed to get bytes of block $blockId from HDFS", e) + None + } + } finally { + try { + input.close() + } catch { + case NonFatal(e) => + logWarning(s"Failed to close input when get bytes to Alluxion", e) + } + } + } + } + } + + override def createInputStream(path: Path): Option[InputStream] = { + if(usehdfs){ + logInfo("Test-log Use HDFS createInputStream") + createInputStreamFromHDFS(path) + } + else{ + logInfo("Test-log Use Alluxio createInputStream") + createInputStreamFromAlluxio(path) + } + } + + def createInputStreamFromAlluxio(path: Path): Option[InputStream] = { + + if (!fs.exists(path)) { + logInfo("Test-log path is " + path.toString + "is not exsist ") + None + } else { + val size = fs.getFileStatus(path).getLen + if (size == 0) { + logInfo("Test-log path size is " + size ) + None + } else { + try { + val input = fs.open(path) + Some(input) + } catch { + case NonFatal(e) => + logInfo(s"Failed to createInputStream $path from Alluxio", e) + None + } + } + } + } + + def createInputStreamFromHDFS(path: Path): Option[InputStream] = { + val hdfsPath = changePathToHDFS(path) + if (!hdfsFs.exists(hdfsPath)) { + logInfo("Test-log HDFS path is " + hdfsPath.toString + "is not exsist ") + None + } else { + val size = hdfsFs.getFileStatus(hdfsPath).getLen + if (size == 0) { + logInfo("Test-log path size is " + size ) + None + } else { + try { + val input = hdfsFs.open(hdfsPath) + Some(input) + } catch { + case NonFatal(e) => + logInfo(s"Failed to createInputStream $hdfsPath from HDFS", e) + None + } + } + } + } + + def changePathToHDFS(path: Path): Path = { + val pathName = path.getName + getFileFromHDFS(BlockId(pathName)) + } + + override def getSize(path: Path): Long = { + val size = if(!fs.exists(path)){ + fs.getFileStatus(path).getLen + }else 0L + size + } + + override def getValues(blockId: BlockId): Option[Iterator[_]] = { + val ct = implicitly[ClassTag[Iterator[_]]] + val bytes: Option[ByteBuffer] = getBytes(blockId) + bytes.map(bs => + // alluxio.hadoop.HdfsFileInputStream#available unsupport! + // blockManager.dataDeserialize(blockId, input) + blockManager.serializerManager.dataDeserializeStream(blockId, inputStream = new ByteBufferInputStream(bs))(ct) + ) + } + + override def getSize(blockId: BlockId): Long = + fs.getFileStatus(getFile(blockId)).getLen + + override def blockExists(blockId: BlockId): Boolean = + fs.exists(getFile(blockId)) + + override def removeBlock(blockId: BlockId): Boolean = + fs.delete(getFile(blockId), false) + + override def shutdown(): Unit = { + hdfsFs.close() + } + + override def getFile(blockId: BlockId): Path = getFile(blockId.name) + + def getFile(filename: String): Path = { + val hash = Utils.nonNegativeHash(filename) + val dirId = hash % subDirs.length + val subDirId = hash % subDirs.length + + val subDir = subDirs(dirId).synchronized { + val old = subDirs(dirId)(subDirId) + if (old != null) + old + else { + val path = new Path(chroot, s"$dirId/"+subDirId.toString) + subDirs(dirId)(subDirId) = path + hdfsDirs(dirId)(subDirId) = new Path(hdfsroot, s"$dirId/"+subDirId.toString) + path + } + } + new Path(subDir, filename) + } + + def getFileFromHDFS(blockId: BlockId): Path = { + val hash = Utils.nonNegativeHash(blockId.name) + val dirId = hash % subDirs.length + val subDirId = hash % subDirs.length + + val subDir = hdfsDirs(dirId)(subDirId) + + new Path(subDir, blockId.name) + } + +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala new file mode 100644 index 0000000000000..4f2ddf23faf72 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala @@ -0,0 +1,63 @@ +package org.apache.spark.storage + +/** + * Created by Chopin on 2018/3/27. + */ + + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * @see org.apache.spark.storage.TachyonBlockManager + */ +private[spark] class AlluxioBlockManagerMaster extends Logging { + + private var chroot: Path = _ + private var fs : FileSystem = _ + + override def toString = "ExternalBlockStore-Alluxio" + + def init(conf: SparkConf): Unit = { + + val masterUrl = conf.get(ExternalBlockStore.MASTER_URL, "alluxio://localhost:19998") + val storeDir = conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_alluxio") + val subDirsPerAlluxio = conf.getInt( + "spark.externalBlockStore.subDirectories", + ExternalBlockStore.SUB_DIRS_PER_DIR.toInt) + val folderName = conf.get(ExternalBlockStore.FOLD_NAME) + + + val master = new Path(masterUrl) + chroot = new Path(master, s"$storeDir/$folderName/" + conf.getAppId ) + fs = master.getFileSystem(new Configuration) + if(!fs.exists(chroot)) + fs.mkdirs(chroot) + mkdir(subDirsPerAlluxio) + } + + def delete(): Unit ={ + try{ + if(fs.exists(chroot)){ + logInfo(s"Test log: delete $chroot") + fs.delete(chroot,true) + } + }catch { + case _ => + logWarning(s"$chroot has been deleted") + }finally + fs.close() + } + + def mkdir(n: Int): Unit = { + for( i <- 0 until n ){ + val path = new Path(chroot,i.toString+s"/$i") + if(!fs.exists(path)) + fs.mkdirs(path) + } + } + +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index adbe3cfd89ea6..24cfaf6cb1a06 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -138,11 +138,16 @@ private[spark] class BlockManager( ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128)) // Actual storage of where blocks are kept + private var externalBlockStoreInitialized = false private[spark] val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this) private[spark] val diskStore = new DiskStore(conf, diskBlockManager, securityManager) memoryManager.setMemoryStore(memoryStore) + private[spark] lazy val externalBlockStore: ExternalBlockStore = { + externalBlockStoreInitialized = true + new ExternalBlockStore(this) + } // Note: depending on the memory manager, `maxMemory` may actually vary over time. // However, since we use this only for reporting and logging, what we actually want here is // the absolute maximum value that `maxMemory` can ever possibly reach. We may need diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index ea5d8423a588c..0e6cd2883b1b3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -35,6 +35,11 @@ class BlockManagerMaster( extends Logging { val timeout = RpcUtils.askRpcTimeout(conf) + lazy val alluxioBlockManagerMaster = new AlluxioBlockManagerMaster + + def init(): Unit = { + alluxioBlockManagerMaster.init(conf) + } /** Remove a dead executor from the driver endpoint. This is only called on the driver side. */ def removeExecutor(execId: String) { @@ -232,6 +237,10 @@ class BlockManagerMaster( if (driverEndpoint != null && isDriver) { tell(StopBlockManagerMaster) driverEndpoint = null + if(conf.getBoolean("spark.alluxio.shuffle.enabled",false)){ + logInfo("Test log: AlluxioBlockManagerMaster delete tmp file") + alluxioBlockManagerMaster.delete() + } logInfo("BlockManagerMaster stopped") } } diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala new file mode 100644 index 0000000000000..38430cccea3d0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala @@ -0,0 +1,145 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.storage +import java.io.{File, InputStream} +import java.nio.ByteBuffer + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.serializer.SerializerManager +import org.apache.spark.util.ByteBufferInputStream + +import scala.reflect.ClassTag + +/** + * An abstract class that the concrete external block manager has to inherit. + * The class has to have a no-argument constructor, and will be initialized by init, + * which is invoked by ExternalBlockStore. The main input parameter is blockId for all + * the methods, which is the unique identifier for Block in one Spark application. + * + * The underlying external block manager should avoid any name space conflicts among multiple + * Spark applications. For example, creating different directory for different applications + * by randomUUID + * + */ +private[spark] abstract class ExternalBlockManager { + + protected var blockManager: BlockManager = _ + protected var serializerManager : SerializerManager = _ + + var fs: FileSystem = _ + var hdfsFs: FileSystem = _ + + + override def toString: String = {"External Block Store"} + + /** + * Initialize a concrete block manager implementation. Subclass should initialize its internal + * data structure, e.g, file system, in this function, which is invoked by ExternalBlockStore + * right after the class is constructed. The function should throw IOException on failure + * + * @throws java.io.IOException if there is any file system failure during the initialization. + */ + def init(blockManager: BlockManager): Unit = { + this.blockManager = blockManager + this.serializerManager = blockManager.serializerManager + } + + /** + * Drop the block from underlying external block store, if it exists.. + * @return true on successfully removing the block + * false if the block could not be removed as it was not found + * + * @throws java.io.IOException if there is any file system failure in removing the block. + */ + def removeBlock(blockId: BlockId): Boolean + + /** + * Used by BlockManager to check the existence of the block in the underlying external + * block store. + * @return true if the block exists. + * false if the block does not exists. + * + * @throws java.io.IOException if there is any file system failure in checking + * the block existence. + */ + def blockExists(blockId: BlockId): Boolean + + /** + * Put the given block to the underlying external block store. Note that in normal case, + * putting a block should never fail unless something wrong happens to the underlying + * external block store, e.g., file system failure, etc. In this case, IOException + * should be thrown. + * + * @throws java.io.IOException if there is any file system failure in putting the block. + */ + def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit + + def putValues(blockId: BlockId, values: Iterator[_]): Unit = { + val bytes = serializerManager.dataSerialize(blockId, values) + putBytes(blockId, bytes.toByteBuffer) + } + + def putFile(blockId: BlockId, file : File): Unit = {} + def putFile(shuffleId: Int, blockId: BlockId, file : File): Unit = {} + /** + * Retrieve the block bytes. + * @return Some(ByteBuffer) if the block bytes is successfully retrieved + * None if the block does not exist in the external block store. + * + * @throws java.io.IOException if there is any file system failure in getting the block. + */ + def getBytes(blockId: BlockId): Option[ByteBuffer] + + /** + * Retrieve the block data. + * @return Some(Iterator[Any]) if the block data is successfully retrieved + * None if the block does not exist in the external block store. + * + * @throws java.io.IOException if there is any file system failure in getting the block. + */ + def getValues(blockId: BlockId): Option[Iterator[_]] = { + val ct = implicitly[ClassTag[Iterator[_]]] + getBytes(blockId).map(buffer => serializerManager.dataDeserializeStream(blockId, new ByteBufferInputStream(buffer))(ct)) + } + + def createInputStream(path: Path): Option[InputStream] + + def getSize(path: Path): Long = 0L + + /** + * Get the size of the block saved in the underlying external block store, + * which is saved before by putBytes. + * @return size of the block + * 0 if the block does not exist + * + * @throws java.io.IOException if there is any file system failure in getting the block size. + */ + def getSize(blockId: BlockId): Long + + /** + * Clean up any information persisted in the underlying external block store, + * e.g., the directory, files, etc,which is invoked by the shutdown hook of ExternalBlockStore + * during system shutdown. + * + */ + def shutdown() + + def delete(): Unit = {} + + def getFile(blockId: BlockId): Path +} diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala new file mode 100644 index 0000000000000..637af57b4c578 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.apache.spark.internal.Logging +import org.apache.spark.util.{ShutdownHookManager, Utils} + +import scala.util.control.NonFatal + + +/** + * Stores BlockManager blocks on ExternalBlockStore. + * We capture any potential exception from underlying implementation + * and return with the expected failure value + */ +private[spark] class ExternalBlockStore(blockManager: BlockManager) extends Logging { + + lazy val externalBlockManager: Option[ExternalBlockManager] = createBlkManager() + + logInfo("ExternalBlockStore started") + + // Create concrete block manager and fall back to Tachyon by default for backward compatibility. + private def createBlkManager(): Option[ExternalBlockManager] = { + val clsName = blockManager.conf.getOption(ExternalBlockStore.BLOCK_MANAGER_NAME) + .getOrElse(ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME) + + try { + val instance = Utils.classForName(clsName) + .newInstance() + .asInstanceOf[ExternalBlockManager] + instance.init(blockManager) + ShutdownHookManager.addShutdownHook { () => + logDebug("Shutdown hook called") + externalBlockManager.map(_.shutdown()) + } + Some(instance) + } catch { + case NonFatal(t) => + logError("Cannot initialize external block store", t) + None + } + } +} + +private[spark] object ExternalBlockStore extends Logging { + val MAX_DIR_CREATION_ATTEMPTS = 10 + val SUB_DIRS_PER_DIR = "64" + val BASE_DIR = "spark.externalBlockStore.baseDir" + val FOLD_NAME = "spark.externalBlockStore.folderName" + val MASTER_URL = "spark.externalBlockStore.url" + val BLOCK_MANAGER_NAME = "spark.externalBlockStore.blockManager" + val DEFAULT_BLOCK_MANAGER_NAME = "org.apache.spark.storage.AlluxioBlockManager" +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index a10f1feadd0af..5f9c226aac530 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{File, InputStream, IOException} +import java.io.{DataInputStream, File, InputStream, IOException} import java.nio.ByteBuffer import java.util.concurrent.LinkedBlockingQueue import javax.annotation.concurrent.GuardedBy @@ -25,12 +25,16 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, HashSet, Queue} -import org.apache.spark.{SparkException, TaskContext} +import com.google.common.io.ByteStreams + +import org.apache.spark.{AlluxioManagedBuffer, SparkEnv, SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager} import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient} -import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.util.Utils +import org.apache.spark.shuffle.{FetchFailedException, IndexShuffleBlockResolver} +import org.apache.spark.util.{ByteBufferInputStream, Utils} import org.apache.spark.util.io.ChunkedByteBufferOutputStream /** @@ -212,8 +216,15 @@ final class ShuffleBlockFetcherIterator( } override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { - logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e) - results.put(new FailureFetchResult(BlockId(blockId), address, e)) + + if (!SparkEnv.get.conf.getBoolean("spark.alluxio.shuffle.enabled", false)) { + logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e) + results.put(new FailureFetchResult(BlockId(blockId), address, e)) + } else { + results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), + getAlluxioBlocks(BlockId(blockId).asInstanceOf[ShuffleBlockId]), + remainingBlocks.isEmpty)) + } } } @@ -233,6 +244,30 @@ final class ShuffleBlockFetcherIterator( } } + private[this] def getAlluxioBlocks(blockId: ShuffleBlockId): ManagedBuffer = { + + val indexFileByteBuffer = blockManager.externalBlockStore.externalBlockManager.get + .getBytes(ShuffleIndexBlockId(blockId.shuffleId, blockId.mapId, + IndexShuffleBlockResolver.NOOP_REDUCE_ID)) + val in = new DataInputStream(new ByteBufferInputStream(indexFileByteBuffer.get)) + try { + ByteStreams.skipFully(in, blockId.reduceId * 8) + val offset = in.readLong() + val nextOffset = in.readLong() + new AlluxioManagedBuffer( + SparkTransportConf.fromSparkConf(blockManager.conf, "shuffle"), + blockManager.externalBlockStore.externalBlockManager.get.getFile( + ShuffleDataBlockId(blockId.shuffleId, blockId.mapId, + IndexShuffleBlockResolver.NOOP_REDUCE_ID)), + offset, + nextOffset - offset, + blockManager.externalBlockStore.externalBlockManager.get + ) + } finally { + in.close() + } + } + private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = { // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 From 65659882839dc626e86f1d3dd73544eb2c28178b Mon Sep 17 00:00:00 2001 From: xiaobang213452 Date: Mon, 6 Aug 2018 13:12:31 +0800 Subject: [PATCH 2/3] update style --- .../scala/org/apache/spark/SparkContext.scala | 3 +- .../CoarseGrainedSchedulerBackend.scala | 5 +- .../shuffle/IndexShuffleBlockResolver.scala | 8 +- .../spark/storage/AlluxioBlockManager.scala | 81 +++++----- .../storage/AlluxioBlockManagerMaster.scala | 55 ++++--- .../spark/storage/BlockManagerMaster.scala | 3 +- .../spark/storage/ExternalBlockManager.scala | 151 +++++++++--------- .../spark/storage/ExternalBlockStore.scala | 17 +- 8 files changed, 164 insertions(+), 159 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e7053b88700e1..6c8a3600dec81 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -514,8 +514,7 @@ class SparkContext(config: SparkConf) extends Logging { _applicationId = _taskScheduler.applicationId() _applicationAttemptId = taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) - if (_conf.getBoolean("spark.alluxio.shuffle.enabled", false)){ - logInfo("Test-log: Init AlluxioBlockMaster") + if (_conf.getBoolean("spark.alluxio.shuffle.enabled", false)) { _env.blockManager.master.init() } if (_conf.getBoolean("spark.ui.reverseProxy", false)) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 1911ea330a5e2..d73dc8f0884d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -361,10 +361,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } if (shouldDisable) { - if(SparkEnv.get.conf.getBoolean("spark.alluxio.shuffle.enabled",false)) + if (SparkEnv.get.conf.getBoolean("spark.alluxio.shuffle.enabled", false)) { scheduler.executorLost(executorId, ExecutorKilled) - else + } else { scheduler.executorLost(executorId, LossReasonPending) + } } shouldDisable diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 855837a2f5e5d..37aeb5885d9c6 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -182,11 +182,9 @@ private[spark] class IndexShuffleBlockResolver( if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { throw new IOException("fail to rename file " + dataTmp + " to " + dataFile) } - if(SparkEnv.get.conf.getBoolean("spark.alluxio.shuffle.enabled",false)){ - logInfo("Testlog: write index file" + indexFile.getName + " indexfile blockid " - + ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) - blockManager.externalBlockStore.externalBlockManager.get.putFile(shuffleId, ShuffleIndexBlockId(shuffleId, - mapId, NOOP_REDUCE_ID), indexFile) + if (SparkEnv.get.conf.getBoolean("spark.alluxio.shuffle.enabled", false)) { + blockManager.externalBlockStore.externalBlockManager.get.putFile(shuffleId, + ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID), indexFile) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala index 8169be17bd549..ca6c8742f0e0f 100644 --- a/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala @@ -1,24 +1,35 @@ -package org.apache.spark.storage +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -/** - * Created by Chopin on 2017/12/22. - */ +package org.apache.spark.storage import java.io.{File, FileInputStream, InputStream} import java.nio.ByteBuffer +import scala.reflect.ClassTag +import scala.util.control.NonFatal + import com.google.common.io.ByteStreams import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path + import org.apache.spark.internal.Logging import org.apache.spark.util.{ByteBufferInputStream, Utils} -import scala.reflect.ClassTag -import scala.util.control.NonFatal - -/** - * @see org.apache.spark.storage.TachyonBlockManager - */ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Logging { private var chroot: Path = _ @@ -27,7 +38,7 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi private var hdfsroot: Path = _ private var usehdfs: Boolean = _ - override def toString = "ExternalBlockStore-Alluxio" + override def toString: String = "ExternalBlockStore-Alluxio" override def init(blockManager: BlockManager): Unit = { super.init(blockManager) @@ -55,7 +66,7 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi } - override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit ={ + override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = { val path = getFile(blockId) val output = fs.create(path, true) try { @@ -73,9 +84,6 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi } } - /** - * this method must be override, alluxio not support write append. - */ override def putValues(blockId: BlockId, values: Iterator[_]): Unit = { val output = fs.create(getFile(blockId), true) try { @@ -93,7 +101,7 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi } } - override def putFile(shuffleId : Int, blockId: BlockId, file : File): Unit ={ + override def putFile(shuffleId : Int, blockId: BlockId, file : File): Unit = { val in = new FileInputStream(file) val bytes = new Array[Byte](file.length().toInt) @@ -106,12 +114,10 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi override def getBytes(blockId: BlockId): Option[ByteBuffer] = { val path = getFile(blockId) if (!fs.exists(path)) { - logInfo("Test-log path is " + path.getName + "is not exsist ") None } else { val size = fs.getFileStatus(path).getLen if (size == 0) { - logInfo("Test-log path size is " + size ) None } else { val input = fs.open(path) @@ -121,13 +127,12 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi ByteStreams.readFully(input, buffer) Some(ByteBuffer.wrap(buffer)) } catch { - case _ => { + case _ => logWarning(s"Failed to get bytes of block $blockId from Alluxio") flag = false getBytesFromHdfs(blockId) - } } finally { - if(flag){ + if (flag) { try { input.close() } catch { @@ -144,7 +149,6 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi usehdfs = true val path = changePathToHDFS(getFile(blockId)) if (!hdfsFs.exists(path)) { - logInfo("Test-log path is " + path.getName + "is not exsist ") None } else { val size = hdfsFs.getFileStatus(path).getLen @@ -158,10 +162,9 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi ByteStreams.readFully(input, buffer) Some(ByteBuffer.wrap(buffer)) } catch { - case NonFatal(e) => { + case NonFatal(e) => logWarning(s"Failed to get bytes of block $blockId from HDFS", e) None - } } finally { try { input.close() @@ -175,12 +178,9 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi } override def createInputStream(path: Path): Option[InputStream] = { - if(usehdfs){ - logInfo("Test-log Use HDFS createInputStream") + if (usehdfs) { createInputStreamFromHDFS(path) - } - else{ - logInfo("Test-log Use Alluxio createInputStream") + } else { createInputStreamFromAlluxio(path) } } @@ -188,12 +188,10 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi def createInputStreamFromAlluxio(path: Path): Option[InputStream] = { if (!fs.exists(path)) { - logInfo("Test-log path is " + path.toString + "is not exsist ") None } else { val size = fs.getFileStatus(path).getLen if (size == 0) { - logInfo("Test-log path size is " + size ) None } else { try { @@ -211,12 +209,10 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi def createInputStreamFromHDFS(path: Path): Option[InputStream] = { val hdfsPath = changePathToHDFS(path) if (!hdfsFs.exists(hdfsPath)) { - logInfo("Test-log HDFS path is " + hdfsPath.toString + "is not exsist ") None } else { val size = hdfsFs.getFileStatus(hdfsPath).getLen if (size == 0) { - logInfo("Test-log path size is " + size ) None } else { try { @@ -237,9 +233,9 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi } override def getSize(path: Path): Long = { - val size = if(!fs.exists(path)){ + val size = if (!fs.exists(path)) { fs.getFileStatus(path).getLen - }else 0L + } else 0L size } @@ -249,7 +245,8 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi bytes.map(bs => // alluxio.hadoop.HdfsFileInputStream#available unsupport! // blockManager.dataDeserialize(blockId, input) - blockManager.serializerManager.dataDeserializeStream(blockId, inputStream = new ByteBufferInputStream(bs))(ct) + blockManager.serializerManager.dataDeserializeStream(blockId, + inputStream = new ByteBufferInputStream(bs))(ct) ) } @@ -275,12 +272,12 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi val subDir = subDirs(dirId).synchronized { val old = subDirs(dirId)(subDirId) - if (old != null) - old - else { - val path = new Path(chroot, s"$dirId/"+subDirId.toString) + if (old != null) { + old + } else { + val path = new Path(chroot, s"$dirId/" + subDirId.toString) subDirs(dirId)(subDirId) = path - hdfsDirs(dirId)(subDirId) = new Path(hdfsroot, s"$dirId/"+subDirId.toString) + hdfsDirs(dirId)(subDirId) = new Path(hdfsroot, s"$dirId/" + subDirId.toString) path } } @@ -297,4 +294,4 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi new Path(subDir, blockId.name) } -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala index 4f2ddf23faf72..cb76b7de92f4c 100644 --- a/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala @@ -1,25 +1,34 @@ -package org.apache.spark.storage - -/** - * Created by Chopin on 2018/3/27. - */ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.storage import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.spark.SparkConf import org.apache.spark.internal.Logging - -/** - * @see org.apache.spark.storage.TachyonBlockManager - */ private[spark] class AlluxioBlockManagerMaster extends Logging { private var chroot: Path = _ private var fs : FileSystem = _ - override def toString = "ExternalBlockStore-Alluxio" + override def toString: String = "ExternalBlockStore-Alluxio" def init(conf: SparkConf): Unit = { @@ -30,34 +39,34 @@ private[spark] class AlluxioBlockManagerMaster extends Logging { ExternalBlockStore.SUB_DIRS_PER_DIR.toInt) val folderName = conf.get(ExternalBlockStore.FOLD_NAME) - val master = new Path(masterUrl) chroot = new Path(master, s"$storeDir/$folderName/" + conf.getAppId ) fs = master.getFileSystem(new Configuration) - if(!fs.exists(chroot)) + if (!fs.exists(chroot)) { fs.mkdirs(chroot) + } mkdir(subDirsPerAlluxio) } - def delete(): Unit ={ - try{ - if(fs.exists(chroot)){ - logInfo(s"Test log: delete $chroot") - fs.delete(chroot,true) + def delete(): Unit = { + try { + if (fs.exists(chroot)) { + fs.delete(chroot, true) } - }catch { + } catch { case _ => logWarning(s"$chroot has been deleted") - }finally + } finally fs.close() } def mkdir(n: Int): Unit = { - for( i <- 0 until n ){ - val path = new Path(chroot,i.toString+s"/$i") - if(!fs.exists(path)) + for (i <- 0 until n) { + val path = new Path(chroot, i.toString + s"/$i") + if (!fs.exists(path)) { fs.mkdirs(path) + } } } -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 0e6cd2883b1b3..bfee57c9257ab 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -237,8 +237,7 @@ class BlockManagerMaster( if (driverEndpoint != null && isDriver) { tell(StopBlockManagerMaster) driverEndpoint = null - if(conf.getBoolean("spark.alluxio.shuffle.enabled",false)){ - logInfo("Test log: AlluxioBlockManagerMaster delete tmp file") + if (conf.getBoolean("spark.alluxio.shuffle.enabled", false)) { alluxioBlockManagerMaster.delete() } logInfo("BlockManagerMaster stopped") diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala index 38430cccea3d0..7dee03e71d8cb 100644 --- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala @@ -1,4 +1,3 @@ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -15,27 +14,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.storage + import java.io.{File, InputStream} import java.nio.ByteBuffer +import scala.reflect.ClassTag + import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.spark.serializer.SerializerManager import org.apache.spark.util.ByteBufferInputStream -import scala.reflect.ClassTag - -/** - * An abstract class that the concrete external block manager has to inherit. - * The class has to have a no-argument constructor, and will be initialized by init, - * which is invoked by ExternalBlockStore. The main input parameter is blockId for all - * the methods, which is the unique identifier for Block in one Spark application. - * - * The underlying external block manager should avoid any name space conflicts among multiple - * Spark applications. For example, creating different directory for different applications - * by randomUUID - * - */ +/* + * An abstract class that the concrete external block manager has to inherit. + * The class has to have a no-argument constructor, and will be initialized by init, + * which is invoked by ExternalBlockStore. The main input parameter is blockId for all + * the methods, which is the unique identifier for Block in one Spark application. + * + * The underlying external block manager should avoid any name space conflicts among multiple + * Spark applications. For example, creating different directory for different applications + * by randomUUID + * + */ private[spark] abstract class ExternalBlockManager { protected var blockManager: BlockManager = _ @@ -47,46 +49,46 @@ private[spark] abstract class ExternalBlockManager { override def toString: String = {"External Block Store"} - /** - * Initialize a concrete block manager implementation. Subclass should initialize its internal - * data structure, e.g, file system, in this function, which is invoked by ExternalBlockStore - * right after the class is constructed. The function should throw IOException on failure - * - * @throws java.io.IOException if there is any file system failure during the initialization. - */ + /* + * Initialize a concrete block manager implementation. Subclass should initialize its internal + * data structure, e.g, file system, in this function, which is invoked by ExternalBlockStore + * right after the class is constructed. The function should throw IOException on failure + * + * @throws java.io.IOException if there is any file system failure during the initialization. + */ def init(blockManager: BlockManager): Unit = { this.blockManager = blockManager this.serializerManager = blockManager.serializerManager } - /** - * Drop the block from underlying external block store, if it exists.. - * @return true on successfully removing the block - * false if the block could not be removed as it was not found - * - * @throws java.io.IOException if there is any file system failure in removing the block. - */ + /* + * Drop the block from underlying external block store, if it exists.. + * @return true on successfully removing the block + * false if the block could not be removed as it was not found + * + * @throws java.io.IOException if there is any file system failure in removing the block. + */ def removeBlock(blockId: BlockId): Boolean - /** - * Used by BlockManager to check the existence of the block in the underlying external - * block store. - * @return true if the block exists. - * false if the block does not exists. - * - * @throws java.io.IOException if there is any file system failure in checking - * the block existence. - */ + /* + * Used by BlockManager to check the existence of the block in the underlying external + * block store. + * @return true if the block exists. + * false if the block does not exists. + * + * @throws java.io.IOException if there is any file system failure in checking + * the block existence. + */ def blockExists(blockId: BlockId): Boolean - /** - * Put the given block to the underlying external block store. Note that in normal case, - * putting a block should never fail unless something wrong happens to the underlying - * external block store, e.g., file system failure, etc. In this case, IOException - * should be thrown. - * - * @throws java.io.IOException if there is any file system failure in putting the block. - */ + /* + * Put the given block to the underlying external block store. Note that in normal case, + * putting a block should never fail unless something wrong happens to the underlying + * external block store, e.g., file system failure, etc. In this case, IOException + * should be thrown. + * + * @throws java.io.IOException if there is any file system failure in putting the block. + */ def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit def putValues(blockId: BlockId, values: Iterator[_]): Unit = { @@ -96,47 +98,48 @@ private[spark] abstract class ExternalBlockManager { def putFile(blockId: BlockId, file : File): Unit = {} def putFile(shuffleId: Int, blockId: BlockId, file : File): Unit = {} - /** - * Retrieve the block bytes. - * @return Some(ByteBuffer) if the block bytes is successfully retrieved - * None if the block does not exist in the external block store. - * - * @throws java.io.IOException if there is any file system failure in getting the block. - */ + /* + * Retrieve the block bytes. + * @return Some(ByteBuffer) if the block bytes is successfully retrieved + * None if the block does not exist in the external block store. + * + * @throws java.io.IOException if there is any file system failure in getting the block. + */ def getBytes(blockId: BlockId): Option[ByteBuffer] - /** - * Retrieve the block data. - * @return Some(Iterator[Any]) if the block data is successfully retrieved - * None if the block does not exist in the external block store. - * - * @throws java.io.IOException if there is any file system failure in getting the block. - */ + /* + * Retrieve the block data. + * @return Some(Iterator[Any]) if the block data is successfully retrieved + * None if the block does not exist in the external block store. + * + * @throws java.io.IOException if there is any file system failure in getting the block. + */ def getValues(blockId: BlockId): Option[Iterator[_]] = { val ct = implicitly[ClassTag[Iterator[_]]] - getBytes(blockId).map(buffer => serializerManager.dataDeserializeStream(blockId, new ByteBufferInputStream(buffer))(ct)) + getBytes(blockId).map(buffer => serializerManager.dataDeserializeStream(blockId, + new ByteBufferInputStream(buffer))(ct)) } def createInputStream(path: Path): Option[InputStream] def getSize(path: Path): Long = 0L - /** - * Get the size of the block saved in the underlying external block store, - * which is saved before by putBytes. - * @return size of the block - * 0 if the block does not exist - * - * @throws java.io.IOException if there is any file system failure in getting the block size. - */ + /* + * Get the size of the block saved in the underlying external block store, + * which is saved before by putBytes. + * @return size of the block + * 0 if the block does not exist + * + * @throws java.io.IOException if there is any file system failure in getting the block size. + */ def getSize(blockId: BlockId): Long - /** - * Clean up any information persisted in the underlying external block store, - * e.g., the directory, files, etc,which is invoked by the shutdown hook of ExternalBlockStore - * during system shutdown. - * - */ + /* + * Clean up any information persisted in the underlying external block store, + * e.g., the directory, files, etc,which is invoked by the shutdown hook of ExternalBlockStore + * during system shutdown. + * + */ def shutdown() def delete(): Unit = {} diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala index 637af57b4c578..5379a48898acb 100644 --- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala @@ -17,17 +17,16 @@ package org.apache.spark.storage -import org.apache.spark.internal.Logging -import org.apache.spark.util.{ShutdownHookManager, Utils} - import scala.util.control.NonFatal +import org.apache.spark.internal.Logging +import org.apache.spark.util.{ShutdownHookManager, Utils} -/** - * Stores BlockManager blocks on ExternalBlockStore. - * We capture any potential exception from underlying implementation - * and return with the expected failure value - */ +/* + * Stores BlockManager blocks on ExternalBlockStore. + * We capture any potential exception from underlying implementation + * and return with the expected failure value + */ private[spark] class ExternalBlockStore(blockManager: BlockManager) extends Logging { lazy val externalBlockManager: Option[ExternalBlockManager] = createBlkManager() @@ -65,4 +64,4 @@ private[spark] object ExternalBlockStore extends Logging { val MASTER_URL = "spark.externalBlockStore.url" val BLOCK_MANAGER_NAME = "spark.externalBlockStore.blockManager" val DEFAULT_BLOCK_MANAGER_NAME = "org.apache.spark.storage.AlluxioBlockManager" -} \ No newline at end of file +} From 20cabe1419f6eb382089a6faecede6cb420619d9 Mon Sep 17 00:00:00 2001 From: xiaobang213452 Date: Mon, 6 Aug 2018 16:17:42 +0800 Subject: [PATCH 3/3] update style --- .../org/apache/spark/storage/AlluxioBlockManager.scala | 4 ++-- .../apache/spark/storage/AlluxioBlockManagerMaster.scala | 6 ++++-- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 1 - 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala index ca6c8742f0e0f..b1668e3fd0d2b 100644 --- a/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManager.scala @@ -127,8 +127,8 @@ private[spark] class AlluxioBlockManager extends ExternalBlockManager with Loggi ByteStreams.readFully(input, buffer) Some(ByteBuffer.wrap(buffer)) } catch { - case _ => - logWarning(s"Failed to get bytes of block $blockId from Alluxio") + case NonFatal(e) => + logWarning(s"Failed to get bytes of block $blockId from Alluxio", e) flag = false getBytesFromHdfs(blockId) } finally { diff --git a/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala index cb76b7de92f4c..7c2ac2e6824ef 100644 --- a/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/AlluxioBlockManagerMaster.scala @@ -17,6 +17,8 @@ package org.apache.spark.storage +import scala.util.control.NonFatal + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -54,8 +56,8 @@ private[spark] class AlluxioBlockManagerMaster extends Logging { fs.delete(chroot, true) } } catch { - case _ => - logWarning(s"$chroot has been deleted") + case NonFatal(e) => + logWarning(s"$chroot has been deleted", e) } finally fs.close() } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 5f9c226aac530..17c0be1f7e134 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -31,7 +31,6 @@ import org.apache.spark.{AlluxioManagedBuffer, SparkEnv, SparkException, TaskCon import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager} import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient} import org.apache.spark.shuffle.{FetchFailedException, IndexShuffleBlockResolver} import org.apache.spark.util.{ByteBufferInputStream, Utils}