From 08b32ba474ba1381838575c0ed8b340c2e3f1b32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Fri, 5 Dec 2014 10:56:20 +0800 Subject: [PATCH 1/9] Pending unroll memory for this block untill tryToPut --- .../scala/org/apache/spark/storage/MemoryStore.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 71305a46bf570..1dfab1cf77ad5 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -46,6 +46,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // A mapping from thread ID to amount of memory used for unrolling a block (in bytes) // All accesses of this map are assumed to have manually synchronized on `accountingLock` private val unrollMemoryMap = mutable.HashMap[Long, Long]() + private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]() /** * The amount of space ensured for unrolling values in memory, shared across all cores. @@ -328,6 +329,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] accountingLock.synchronized { + releasePendingUnrollMemoryForThisThread() val freeSpaceResult = ensureFreeSpace(blockId, size) val enoughFreeSpace = freeSpaceResult.success droppedBlocks ++= freeSpaceResult.droppedBlocks @@ -459,6 +461,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (memory < 0) { unrollMemoryMap.remove(threadId) } else { + pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId) + memory unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, memory) - memory // If this thread claims no more unroll memory, release it completely if (unrollMemoryMap(threadId) <= 0) { @@ -468,11 +471,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } + def releasePendingUnrollMemoryForThisThread(): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + pendingUnrollMemoryMap.remove(threadId) + } + } + /** * Return the amount of memory currently occupied for unrolling blocks across all threads. */ def currentUnrollMemory: Long = accountingLock.synchronized { - unrollMemoryMap.values.sum + unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum } /** From f664317a3a85929e50daea411967f953a56fb7d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Mon, 8 Dec 2014 15:15:58 +0800 Subject: [PATCH 2/9] Make sure not add pending in every releaseUnrollMemory call --- .../main/scala/org/apache/spark/storage/MemoryStore.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 1dfab1cf77ad5..369daa83c55c0 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -289,7 +289,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // we release the memory claimed by this thread later on when the task finishes. if (keepUnrolling) { val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved - releaseUnrollMemoryForThisThread(amountToRelease) + releaseUnrollMemoryForThisThread(amountToRelease, true) } } } @@ -455,13 +455,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Release memory used by this thread for unrolling blocks. * If the amount is not specified, remove the current thread's allocation altogether. */ - def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = { + def releaseUnrollMemoryForThisThread(memory: Long = -1L, pending: Boolean = false): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { if (memory < 0) { unrollMemoryMap.remove(threadId) } else { - pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId) + memory + if (pending) { + pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId) + memory + } unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, memory) - memory // If this thread claims no more unroll memory, release it completely if (unrollMemoryMap(threadId) <= 0) { From 3323c4594d3364d245f0635fe27c250b91e5c9d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Thu, 11 Dec 2014 10:28:37 +0800 Subject: [PATCH 3/9] Refine getOrElse --- core/src/main/scala/org/apache/spark/storage/MemoryStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 369daa83c55c0..199f072c6006b 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -462,7 +462,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) unrollMemoryMap.remove(threadId) } else { if (pending) { - pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId) + memory + pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId, 0L) + memory } unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, memory) - memory // If this thread claims no more unroll memory, release it completely From 3a3f2c85b19a682663ae29dcd50d0f524da26849 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Thu, 11 Dec 2014 15:08:41 +0800 Subject: [PATCH 4/9] Refine blockManagerSuite unroll test --- .../test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index ffe6f039145ea..3fdbe99b5d02b 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1064,6 +1064,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) assert(memoryStore.currentUnrollMemoryForThisThread === 0) + memoryStore.releasePendingUnrollMemoryForThisThread() // Unroll with not enough space. This should succeed after kicking out someBlock1. store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) @@ -1074,6 +1075,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach assert(droppedBlocks.size === 1) assert(droppedBlocks.head._1 === TestBlockId("someBlock1")) droppedBlocks.clear() + memoryStore.releasePendingUnrollMemoryForThisThread() // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. From 0fc2bec6c52bfba050cb40963723beaaae7ce898 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Mon, 12 Jan 2015 11:54:09 +0800 Subject: [PATCH 5/9] Release pending unroll memory after put block in memoryStore --- core/src/main/scala/org/apache/spark/storage/MemoryStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 199f072c6006b..578c6a452fdfb 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -329,7 +329,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] accountingLock.synchronized { - releasePendingUnrollMemoryForThisThread() val freeSpaceResult = ensureFreeSpace(blockId, size) val enoughFreeSpace = freeSpaceResult.success droppedBlocks ++= freeSpaceResult.droppedBlocks @@ -355,6 +354,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } + releasePendingUnrollMemoryForThisThread() } ResultWithDroppedBlocks(putSuccess, droppedBlocks) } From 0fd02137730820505c066c40f40aed843bb67532 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Fri, 16 Jan 2015 11:51:21 +0800 Subject: [PATCH 6/9] add comments --- .../main/scala/org/apache/spark/storage/MemoryStore.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 578c6a452fdfb..7f817c5ab8c11 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -284,8 +284,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } finally { - // If we return an array, the values returned do not depend on the underlying vector and - // we can immediately free up space for other threads. Otherwise, if we return an iterator, + // If we return an array, the values returned will be used in tryToPut() + // and finally put into memoryStore.entrys, so we pending unroll memory + // for this thread and release it in tryToPut(). Otherwise, if we return an iterator, // we release the memory claimed by this thread later on when the task finishes. if (keepUnrolling) { val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved @@ -329,6 +330,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] accountingLock.synchronized { + releasePendingUnrollMemoryForThisThread() val freeSpaceResult = ensureFreeSpace(blockId, size) val enoughFreeSpace = freeSpaceResult.success droppedBlocks ++= freeSpaceResult.droppedBlocks @@ -354,7 +356,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } - releasePendingUnrollMemoryForThisThread() } ResultWithDroppedBlocks(putSuccess, droppedBlocks) } From 39960d09f72e927809f7d173452220edc9a3f92e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Fri, 27 Feb 2015 10:36:20 +0800 Subject: [PATCH 7/9] Refine comments --- .../apache/spark/storage/MemoryStore.scala | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 7f817c5ab8c11..1b65d98f9b803 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -46,6 +46,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // A mapping from thread ID to amount of memory used for unrolling a block (in bytes) // All accesses of this map are assumed to have manually synchronized on `accountingLock` private val unrollMemoryMap = mutable.HashMap[Long, Long]() + // Same as `unrollMemoryMap`, but for pending unroll memory as defined below. + // Pending unroll memory refers to the intermediate memory occupied by a thread + // after the unroll but before the actual putting of the block in the cache. + // This chunk of memory is expected to be released *as soon as* we finish + // caching the corresponding block as opposed to until after the task finishes. + // This is only used if a block is successfully unrolled in its entirety in + // memory (SPARK-4777). private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]() /** @@ -284,13 +291,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } finally { - // If we return an array, the values returned will be used in tryToPut() - // and finally put into memoryStore.entrys, so we pending unroll memory - // for this thread and release it in tryToPut(). Otherwise, if we return an iterator, - // we release the memory claimed by this thread later on when the task finishes. + // If we return an array, the values returned will later be cached in `tryToPut`. + // In this case, we should release the memory after we cache the block there. + // Otherwise, if we return an iterator, we release the memory reserved here + // later when the task finishes. if (keepUnrolling) { - val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved - releaseUnrollMemoryForThisThread(amountToRelease, true) + accountingLock.synchronized { + val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved + releaseUnrollMemoryForThisThread(amountToRelease) + reservePendingUnrollMemoryForThisThread(amountToRelease) + } } } } @@ -330,7 +340,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] accountingLock.synchronized { - releasePendingUnrollMemoryForThisThread() val freeSpaceResult = ensureFreeSpace(blockId, size) val enoughFreeSpace = freeSpaceResult.success droppedBlocks ++= freeSpaceResult.droppedBlocks @@ -356,6 +365,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } + // Release the unroll memory used because we no longer need the underlying Array + releasePendingUnrollMemoryForThisThread() } ResultWithDroppedBlocks(putSuccess, droppedBlocks) } @@ -456,15 +467,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Release memory used by this thread for unrolling blocks. * If the amount is not specified, remove the current thread's allocation altogether. */ - def releaseUnrollMemoryForThisThread(memory: Long = -1L, pending: Boolean = false): Unit = { + def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { if (memory < 0) { unrollMemoryMap.remove(threadId) } else { - if (pending) { - pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId, 0L) + memory - } unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, memory) - memory // If this thread claims no more unroll memory, release it completely if (unrollMemoryMap(threadId) <= 0) { @@ -474,6 +482,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } + def reservePendingUnrollMemoryForThisThread(memory: Long): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId, 0L) + memory + } + } + def releasePendingUnrollMemoryForThisThread(): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { From 407b2c9a18f3cae6d82afacf68927ce1bcf1140e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Fri, 27 Feb 2015 11:15:14 +0800 Subject: [PATCH 8/9] Refine according comments --- .../scala/org/apache/spark/storage/MemoryStore.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 1b65d98f9b803..e1eeed64b5510 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -395,6 +395,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } // Take into account the amount of memory currently occupied by unrolling blocks + // and minus the pending unroll memory for that block on current thread. + val threadId = Thread.currentThread().getId val actualFreeMemory = freeMemory - currentUnrollMemory if (actualFreeMemory < space) { @@ -482,6 +484,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } + /** + * Reserve the unroll memory of current unroll successful block used by this thread + * until actually put the block into memory entry. + */ def reservePendingUnrollMemoryForThisThread(memory: Long): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { @@ -489,6 +495,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } + /** + * Release pending unroll memory of current unroll successful block used by this thread + */ def releasePendingUnrollMemoryForThisThread(): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { @@ -500,7 +509,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Return the amount of memory currently occupied for unrolling blocks across all threads. */ def currentUnrollMemory: Long = accountingLock.synchronized { - unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum + unrollMemoryMap.values.sum + pendingUnrollMemoryMap. + filter(_._1 != Thread.currentThread().getId).values.sum } /** From 809cc41d2bc7910771c75f98f585af2ff5d12284 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Fri, 27 Feb 2015 18:08:29 +0800 Subject: [PATCH 9/9] Refine --- .../main/scala/org/apache/spark/storage/MemoryStore.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index e1eeed64b5510..5560afed4e7b0 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -397,7 +397,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // Take into account the amount of memory currently occupied by unrolling blocks // and minus the pending unroll memory for that block on current thread. val threadId = Thread.currentThread().getId - val actualFreeMemory = freeMemory - currentUnrollMemory + val actualFreeMemory = freeMemory - currentUnrollMemory + + pendingUnrollMemoryMap.getOrElse(threadId, 0L) if (actualFreeMemory < space) { val rddToAdd = getRddId(blockIdToAdd) @@ -509,8 +510,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Return the amount of memory currently occupied for unrolling blocks across all threads. */ def currentUnrollMemory: Long = accountingLock.synchronized { - unrollMemoryMap.values.sum + pendingUnrollMemoryMap. - filter(_._1 != Thread.currentThread().getId).values.sum + unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum } /**